From 874f6e3a092bdaa5f151c512c9284b15f5886e82 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 19 Jan 2010 09:53:03 +0000
Subject: [PATCH] This is about refactoring the way the directory server chooses the  replication server it will connect to. This also introduces a new  (weighed) load balancing feature that spreads DS connections across the  RSs, according to the RS weights defined by the administrator,

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 1620 ++++++++++++++++++++++++++++++++++++++---------------------
 1 files changed, 1,047 insertions(+), 573 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index bd145d6..baa4310 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,12 +26,16 @@
  */
 package org.opends.server.replication.service;
 
+import java.net.UnknownHostException;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -40,10 +44,12 @@
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -51,7 +57,6 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.messages.Severity;
-import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.DSInfo;
@@ -102,7 +107,7 @@
   private Semaphore sendWindow;
   private int maxSendWindow;
   private int rcvWindow = 100;
-  private int halfRcvWindow = rcvWindow/2;
+  private int halfRcvWindow = rcvWindow / 2;
   private int maxRcvWindow = rcvWindow;
   private int timeout = 0;
   private short protocolVersion;
@@ -117,13 +122,11 @@
   private String rsServerUrl = null;
   // Our replication domain
   private ReplicationDomain domain = null;
-
   /**
    * This object is used as a conditional event to be notified about
    * the reception of monitor information from the Replication Server.
    */
   private final MutableBoolean monitorResponse = new MutableBoolean(false);
-
   /**
    * A Map containing the ServerStates of all the replicas in the topology
    * as seen by the ReplicationServer the last time it was polled or the last
@@ -131,15 +134,6 @@
    */
   private HashMap<Integer, ServerState> replicaStates =
     new HashMap<Integer, ServerState>();
-
-  /**
-   * A Map containing the ServerStates of all the replication servers in the
-   * topology as seen by the ReplicationServer the last time it was polled or
-   * the last time it published monitoring information.
-   */
-  private HashMap<Integer, ServerState> rsStates =
-    new HashMap<Integer, ServerState>();
-
   /**
    * The expected duration in milliseconds between heartbeats received
    * from the replication server.  Zero means heartbeats are off.
@@ -163,10 +157,6 @@
    */
   private boolean connectionError = false;
   private final Object connectPhaseLock = new Object();
-
-  // Same group id poller thread
-  private SameGroupIdPoller sameGroupIdPoller = null;
-
   /**
    * The thread that publishes messages to the RS containing the current
    * change time of this DS.
@@ -174,7 +164,7 @@
   private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
   /**
    * The expected period in milliseconds between these messages are sent
-   * to the replication server.  Zero means heartbeats are off.
+   * to the replication server. Zero means heartbeats are off.
    */
   private long changeTimeHeartbeatSendInterval = 0;
   /*
@@ -183,12 +173,30 @@
   // Info for other DSs.
   // Warning: does not contain info for us (for our server id)
   private List<DSInfo> dsList = new ArrayList<DSInfo>();
-  // Info for other RSs.
-  private List<RSInfo> rsList = new ArrayList<RSInfo>();
-
   private long generationID;
   private int updateDoneCount = 0;
   private boolean connectRequiresRecovery = false;
+  /**
+   * The map of replication server info initialized at connection time and
+   * regularly updated. This is used to decide to which best suitable
+   * replication server one wants to connect.
+   * Key: replication server id
+   * Value: replication server info for the matching replication server id
+   */
+  private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
+  /**
+   * This integer defines when the best replication server checking algorithm
+   * should be engaged.
+   * Every time a monitoring message (each monitoring publisher period) is
+   * received, it is incremented. When it reaches 2, we run the checking
+   * algorithm to see if we must reconnect to another best replication server.
+   * Then we reset the value to 0. But when a topology message is received, the
+   * integer is reseted to 0. This insures that we wait at least one monitoring
+   * publisher period before running the algorithm, but also that we wait at
+   * least for a monitoring period after the last received topology message
+   * (topology stabilization).
+   */
+  private int mustRunBestServerCheckingAlgorithm = 0;
 
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -228,7 +236,7 @@
     this.heartbeatInterval = heartbeatInterval;
     this.maxRcvWindow = window;
     this.maxRcvWindow = window;
-    this.halfRcvWindow = window /2;
+    this.halfRcvWindow = window / 2;
     this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
   }
 
@@ -284,7 +292,7 @@
     return rsServerId;
   }
 
- /**
+  /**
    * Gets the server id.
    * @return The server id
    */
@@ -300,9 +308,11 @@
   private long getGenerationID()
   {
     if (domain != null)
-      return domain.getGenerationID();
-    else
-      return generationID;
+    {
+      // Update the generation id
+      generationID = domain.getGenerationID();
+    }
+    return generationID;
   }
 
   /**
@@ -315,48 +325,167 @@
   }
 
   /**
-   * Bag class for keeping info we get from a server in order to compute the
-   * best one to connect to. This is in fact a wrapper to a
-   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
+   * Sets the locally configured flag for the passed ReplicationServerInfo
+   * object, analyzing the local configuration.
+   * @param
    */
-  public static class ServerInfo
+  private void updateRSInfoLocallyConfiguredStatus(
+    ReplicationServerInfo replicationServerInfo)
+  {
+    // Determine if the passed ReplicationServerInfo has a URL that is present
+    // in the locally configured replication servers
+    String rsUrl = replicationServerInfo.getServerURL();
+    if (rsUrl == null)
+    {
+      // The ReplicationServerInfo has been generated from a server with
+      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
+      // ignore this server as we do not know how to connect to it
+      replicationServerInfo.setLocallyConfigured(false);
+      return;
+    }
+    for (String serverUrl : servers)
+    {
+      if (isSameReplicationServerUrl(serverUrl, rsUrl))
+      {
+        // This RS is locally configured, mark this
+        replicationServerInfo.setLocallyConfigured(true);
+        return;
+      }
+    }
+    replicationServerInfo.setLocallyConfigured(false);
+  }
+
+  /**
+   * Compares 2 replication servers addresses and returns true if they both
+   * represent the same replication server instance.
+   * @param rs1Url Replication server 1 address
+   * @param rs2Url Replication server 2 address
+   * @return True if both replication server addresses represent the same
+   * replication server instance, false otherwise.
+   */
+  private static boolean isSameReplicationServerUrl(String rs1Url,
+    String rs2Url)
+  {
+    // Get and compare ports of RS1 and RS2
+    int separator1 = rs1Url.lastIndexOf(':');
+    if (separator1 < 0)
+    {
+      // Not a RS url: should not happen
+      return false;
+    }
+    int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1));
+
+    int separator2 = rs2Url.lastIndexOf(':');
+    if (separator2 < 0)
+    {
+      // Not a RS url: should not happen
+      return false;
+    }
+    int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1));
+
+    if (rs1Port != rs2Port)
+    {
+      return false;
+    }
+
+    // Get and compare addresses of RS1 and RS2
+    String rs1 = rs1Url.substring(0, separator1);
+    InetAddress[] rs1Addresses = null;
+    try
+    {
+      if (rs1.equals("localhost") || rs1.equals("127.0.0.1"))
+      {
+        // Replace localhost with the local official hostname
+        rs1 = InetAddress.getLocalHost().getHostName();
+      }
+      rs1Addresses = InetAddress.getAllByName(rs1);
+    } catch (UnknownHostException ex)
+    {
+      // Unknown RS: should not happen
+      return false;
+    }
+
+    String rs2 = rs2Url.substring(0, separator2);
+    InetAddress[] rs2Addresses = null;
+    try
+    {
+      if (rs2.equals("localhost") || rs2.equals("127.0.0.1"))
+      {
+        // Replace localhost with the local official hostname
+        rs2 = InetAddress.getLocalHost().getHostName();
+      }
+      rs2Addresses = InetAddress.getAllByName(rs2);
+    } catch (UnknownHostException ex)
+    {
+      // Unknown RS: should not happen
+      return false;
+    }
+
+    // Now compare addresses, if at least one match, this is the same server
+    for (int i = 0; i < rs1Addresses.length; i++)
+    {
+      InetAddress inetAddress1 = rs1Addresses[i];
+      for (int j = 0; j < rs2Addresses.length; j++)
+      {
+        InetAddress inetAddress2 = rs2Addresses[j];
+        if (inetAddress2.equals(inetAddress1))
+        {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Bag class for keeping info we get from a replication server in order to
+   * compute the best one to connect to. This is in fact a wrapper to a
+   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
+   * updated with a info coming from received topology messages or monitoring
+   * messages.
+   */
+  public static class ReplicationServerInfo
   {
     private short protocolVersion;
     private long generationId;
     private byte groupId = (byte) -1;
     private int serverId;
+    // Received server URL
     private String serverURL;
     private String baseDn = null;
     private int windowSize;
-    private ServerState serverState;
+    private ServerState serverState = null;
     private boolean sslEncryption;
     private int degradedStatusThreshold = -1;
-    // Keeps the -1 value if created with a ReplServerStartMsg
-    private int weight = -1;
-    // Keeps the -1 value if created with a ReplServerStartMsg
-    private int connectedDSNumber = -1;
+    // Keeps the 1 value if created with a ReplServerStartMsg
+    private int weight = 1;
+    // Keeps the 0 value if created with a ReplServerStartMsg
+    private int connectedDSNumber = 0;
+    private List<Integer> connectedDSs = null;
+    // Is this RS locally configured ? (the RS is recognized as a usable server)
+    private boolean locallyConfigured = true;
 
     /**
-     * Create a new instance of ServerInfo wrapping the passed message.
+     * Create a new instance of ReplicationServerInfo wrapping the passed
+     * message.
      * @param msg Message to wrap.
      * @return The new instance wrapping the passed message.
      * @throws IllegalArgumentException If the passed message has an unexpected
      *                                  type.
      */
-    public static ServerInfo newServerInfo(
+    public static ReplicationServerInfo newInstance(
       ReplicationMsg msg) throws IllegalArgumentException
     {
       if (msg instanceof ReplServerStartMsg)
       {
         // This is a ReplServerStartMsg (RS uses protocol V3 or under)
-        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
-        return new ServerInfo(replServerStartMsg);
-      }
-      else if (msg instanceof ReplServerStartDSMsg)
+        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
+        return new ReplicationServerInfo(replServerStartMsg);
+      } else if (msg instanceof ReplServerStartDSMsg)
       {
         // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
-        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
-        return new ServerInfo(replServerStartDSMsg);
+        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
+        return new ReplicationServerInfo(replServerStartDSMsg);
       }
 
       // Unsupported message type: should not happen
@@ -365,10 +494,10 @@
     }
 
     /**
-     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+     * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
      * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
      */
-    private ServerInfo(ReplServerStartMsg replServerStartMsg)
+    private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
     {
       this.protocolVersion = replServerStartMsg.getVersion();
       this.generationId = replServerStartMsg.getGenerationId();
@@ -384,11 +513,12 @@
     }
 
     /**
-     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+     * Constructs a ReplicationServerInfo object wrapping a
+     * ReplServerStartDSMsg.
      * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
      * wrap.
      */
-    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+    private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
     {
       this.protocolVersion = replServerStartDSMsg.getVersion();
       this.generationId = replServerStartDSMsg.getGenerationId();
@@ -514,16 +644,98 @@
     {
       return connectedDSNumber;
     }
+
+    /**
+     * Constructs a new replication server info with the passed RSInfo
+     * internal values and the passed connected DSs.
+     * @param rsInfo The RSinfo to use for the update
+     * @param connectedDSs The new connected DSs
+     */
+    public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
+    {
+      this.serverId = rsInfo.getId();
+      this.serverURL = rsInfo.getServerUrl();
+      this.generationId = rsInfo.getGenerationId();
+      this.groupId = rsInfo.getGroupId();
+      this.weight = rsInfo.getWeight();
+      this.connectedDSs = connectedDSs;
+      this.connectedDSNumber = connectedDSs.size();
+    }
+
+    /**
+     * Converts the object to a RSInfo object.
+     * @return The RSInfo object matching this object.
+     */
+    public RSInfo toRSInfo()
+    {
+      return new RSInfo(serverId, serverURL, generationId, groupId, weight);
+    }
+
+    /**
+     * Updates replication server info with the passed RSInfo internal values
+     * and the passed connected DSs.
+     * @param rsInfo The RSinfo to use for the update
+     * @param connectedDSs The new connected DSs
+     */
+    public void update(RSInfo rsInfo, List<Integer> connectedDSs)
+    {
+      this.generationId = rsInfo.getGenerationId();
+      this.groupId = rsInfo.getGroupId();
+      this.weight = rsInfo.getWeight();
+      this.connectedDSs = connectedDSs;
+      this.connectedDSNumber = connectedDSs.size();
+    }
+
+    /**
+     * Updates replication server info with the passed server state.
+     * @param serverState The ServerState to use for the update
+     */
+    public void update(ServerState serverState)
+    {
+      if (this.serverState != null)
+      {
+        this.serverState.update(serverState);
+      } else
+      {
+        this.serverState = serverState;
+      }
+    }
+
+    /**
+     * Get the getConnectedDSs.
+     * @return the getConnectedDSs
+     */
+    public List<Integer> getConnectedDSs()
+    {
+      return connectedDSs;
+    }
+
+    /**
+     * Gets the locally configured status for this RS.
+     * @return the locallyConfigured
+     */
+    public boolean isLocallyConfigured()
+    {
+      return locallyConfigured;
+    }
+
+    /**
+     * Sets the locally configured status for this RS.
+     * @param locallyConfigured the locallyConfigured to set
+     */
+    public void setLocallyConfigured(boolean locallyConfigured)
+    {
+      this.locallyConfigured = locallyConfigured;
+    }
   }
 
   private void connect()
   {
     if (this.baseDn.compareToIgnoreCase(
-        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
+      ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
     {
       connectAsECL();
-    }
-    else
+    } else
     {
       connectAsDataServer();
     }
@@ -534,19 +746,22 @@
    * able to choose the more suitable.
    * @return the collected information.
    */
-  private Map<String, ServerInfo> collectReplicationServersInfo() {
+  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
+  {
 
-    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+    Map<Integer, ReplicationServerInfo> rsInfos =
+      new HashMap<Integer, ReplicationServerInfo>();
 
     for (String server : servers)
     {
       // Connect to server and get info about it
-      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+      ReplicationServerInfo replicationServerInfo =
+        performPhaseOneHandshake(server, false);
 
       // Store server info in list
-      if (serverInfo != null)
+      if (replicationServerInfo != null)
       {
-        rsInfos.put(server, serverInfo);
+        rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
       }
     }
 
@@ -558,7 +773,6 @@
    * are :
    * - 1 single RS configured
    * - so no choice of the preferred RS
-   * - No same groupID polling
    * - ?? Heartbeat
    * - Start handshake is :
    *    Broker ---> StartECLMsg       ---> RS
@@ -570,10 +784,10 @@
     // FIXME:ECL List of RS to connect is for now limited to one RS only
     String bestServer = this.servers.iterator().next();
 
-    ReplServerStartDSMsg inReplServerStartDSMsg
-      = performECLPhaseOneHandshake(bestServer, true);
+    ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
+      bestServer, true);
 
-    if (inReplServerStartDSMsg!=null)
+    if (inReplServerStartDSMsg != null)
       performECLPhaseTwoHandshake(bestServer);
   }
 
@@ -589,7 +803,7 @@
    *
    * phase 1:
    * DS --- ServerStartMsg ---> RS
-   * DS <--- ReplServerStartMsg --- RS
+   * DS <--- ReplServerStartDSMsg --- RS
    * phase 2:
    * DS --- StartSessionMsg ---> RS
    * DS <--- TopologyMsg --- RS
@@ -615,9 +829,9 @@
     }
 
     // Stop any existing poller and heartbeat monitor from a previous session.
-    stopSameGroupIdPoller();
     stopRSHeartBeatMonitoring();
     stopChangeTimeHeartBeatPublishing();
+    mustRunBestServerCheckingAlgorithm = 0;
 
     boolean newServerWithSameGroupId = false;
     synchronized (connectPhaseLock)
@@ -628,41 +842,47 @@
        */
       if (debugEnabled())
         TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
-            " order to elect the preferred one");
+          " order to elect the preferred one");
 
       // Get info from every available replication servers
-      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
+      replicationServerInfos = collectReplicationServersInfo();
 
-      ServerInfo serverInfo = null;
+      ReplicationServerInfo replicationServerInfo = null;
 
-      if (rsInfos.size() > 0)
+      if (replicationServerInfos.size() > 0)
       {
         // At least one server answered, find the best one.
-        String bestServer = computeBestReplicationServer(state, rsInfos,
-          serverId, baseDn, groupId);
+        replicationServerInfo = computeBestReplicationServer(true, -1, state,
+          replicationServerInfos, serverId, baseDn, groupId,
+          this.getGenerationID());
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
           TRACER.debugInfo(
-              "phase 2 : will perform PhaseOneH with the preferred RS.");
-        serverInfo = performPhaseOneHandshake(bestServer, true);
+            "phase 2 : will perform PhaseOneH with the preferred RS.");
+        replicationServerInfo = performPhaseOneHandshake(
+          replicationServerInfo.getServerURL(), true);
+        // Update replication server info with potentially more up to date data
+        // (server state for instance may have changed)
+        replicationServerInfos.put(replicationServerInfo.getServerId(),
+          replicationServerInfo);
 
-        if (serverInfo != null) // Handshake phase 1 exchange went well
-
+        if (replicationServerInfo != null)
         {
+          // Handshake phase 1 exchange went well
+
           // Compute in which status we are starting the session to tell the RS
           ServerStatus initStatus =
-            computeInitialServerStatus(serverInfo.getGenerationId(),
-            serverInfo.getServerState(),
-            serverInfo.getDegradedStatusThreshold(),
+            computeInitialServerStatus(replicationServerInfo.getGenerationId(),
+            replicationServerInfo.getServerState(),
+            replicationServerInfo.getDegradedStatusThreshold(),
             this.getGenerationID());
 
-          // Perfom session start (handshake phase 2)
-          TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
-            initStatus);
+          // Perform session start (handshake phase 2)
+          TopologyMsg topologyMsg = performPhaseTwoHandshake(
+            replicationServerInfo.getServerURL(), initStatus);
 
           if (topologyMsg != null) // Handshake phase 2 exchange went well
-
           {
             try
             {
@@ -681,7 +901,7 @@
                * reconnection at that time to retrieve a server with our group
                * id.
                */
-              byte tmpRsGroupId = serverInfo.getGroupId();
+              byte tmpRsGroupId = replicationServerInfo.getGroupId();
               boolean someServersWithSameGroupId =
                 hasSomeServerWithSameGroupId(topologyMsg.getRsList());
 
@@ -690,10 +910,10 @@
                 ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
               {
                 replicationServer = session.getReadableRemoteAddress();
-                maxSendWindow = serverInfo.getWindowSize();
-                rsGroupId = serverInfo.getGroupId();
-                rsServerId = serverInfo.getServerId();
-                rsServerUrl = bestServer;
+                maxSendWindow = replicationServerInfo.getWindowSize();
+                rsGroupId = replicationServerInfo.getGroupId();
+                rsServerId = replicationServerInfo.getServerId();
+                rsServerUrl = replicationServerInfo.getServerURL();
 
                 receiveTopo(topologyMsg);
 
@@ -715,27 +935,27 @@
                 if (domain != null)
                 {
                   domain.sessionInitiated(
-                      initStatus, serverInfo.getServerState(),
-                      serverInfo.getGenerationId(),
-                      session);
+                    initStatus, replicationServerInfo.getServerState(),
+                    replicationServerInfo.getGenerationId(),
+                    session);
                 }
 
                 if (getRsGroupId() != groupId)
                 {
-                 // Connected to replication server with wrong group id:
-                 // warn user and start poller to recover when a server with
-                 // right group id arrives...
-                 Message message =
-                   WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
-                   Byte.toString(groupId),  Integer.toString(rsServerId),
-                   bestServer, Byte.toString(getRsGroupId()),
-                   baseDn.toString(), Integer.toString(serverId));
-                 logError(message);
-                 startSameGroupIdPoller();
+                  // Connected to replication server with wrong group id:
+                  // warn user and start poller to recover when a server with
+                  // right group id arrives...
+                  Message message =
+                    WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+                    Byte.toString(groupId), Integer.toString(rsServerId),
+                    replicationServerInfo.getServerURL(),
+                    Byte.toString(getRsGroupId()),
+                    baseDn.toString(), Integer.toString(serverId));
+                  logError(message);
                 }
                 startRSHeartBeatMonitoring();
-                if (serverInfo.getProtocolVersion()
-                    >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+                if (replicationServerInfo.getProtocolVersion() >=
+                  ProtocolVersion.REPLICATION_PROTOCOL_V3)
                 {
                   startChangeTimeHeartBeatPublishing();
                 }
@@ -753,7 +973,7 @@
             } catch (Exception e)
             {
               Message message = ERR_COMPUTING_FAKE_OPS.get(
-                baseDn, bestServer,
+                baseDn, replicationServerInfo.getServerURL(),
                 e.getLocalizedMessage() + stackTraceToSingleLineString(e));
               logError(message);
             } finally
@@ -783,8 +1003,9 @@
       {
         connectPhaseLock.notify();
 
-        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
-          (serverInfo.getGenerationId() == -1))
+        if ((replicationServerInfo.getGenerationId() ==
+          this.getGenerationID()) ||
+          (replicationServerInfo.getGenerationId() == -1))
         {
           Message message =
             NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -801,7 +1022,7 @@
             baseDn.toString(),
             replicationServer,
             Long.toString(this.getGenerationID()),
-            Long.toString(serverInfo.getGenerationId()));
+            Long.toString(replicationServerInfo.getGenerationId()));
           logError(message);
         }
       } else
@@ -908,7 +1129,7 @@
   /**
    * Connect to the provided server performing the first phase handshake
    * (start messages exchange) and return the reply message from the replication
-   * server, wrapped in a ServerInfo object.
+   * server, wrapped in a ReplicationServerInfo object.
    *
    * @param server Server to connect to.
    * @param keepConnection Do we keep session opened or not after handshake.
@@ -917,10 +1138,10 @@
    * @return The answer from the server . Null if could not
    *         get an answer.
    */
-  private ServerInfo performPhaseOneHandshake(String server,
+  private ReplicationServerInfo performPhaseOneHandshake(String server,
     boolean keepConnection)
   {
-    ServerInfo serverInfo = null;
+    ReplicationServerInfo replServerInfo = null;
 
     // Parse server string.
     int separator = server.lastIndexOf(':');
@@ -962,17 +1183,17 @@
       ReplicationMsg msg = localSession.receive();
 
       if (debugEnabled())
-        {
-          TRACER.debugInfo("In RB for " + baseDn +
-            "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
-            "\nAND RECEIVED:\n" + msg.toString());
-        }
+      {
+        TRACER.debugInfo("In RB for " + baseDn +
+          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+          "\nAND RECEIVED:\n" + msg.toString());
+      }
 
       // Wrap received message in a server info object
-      serverInfo = ServerInfo.newServerInfo(msg);
+      replServerInfo = ReplicationServerInfo.newInstance(msg);
 
       // Sanity check
-      String repDn = serverInfo.getBaseDn();
+      String repDn = replServerInfo.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
         Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -987,7 +1208,7 @@
        * if it is an old replication server).
        */
       protocolVersion = ProtocolVersion.minWithCurrent(
-        serverInfo.getProtocolVersion());
+        replServerInfo.getProtocolVersion());
       localSession.setProtocolVersion(protocolVersion);
 
 
@@ -1017,7 +1238,7 @@
       error = true;
     } catch (Exception e)
     {
-      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+      if ((e instanceof SocketTimeoutException) && debugEnabled())
       {
         TRACER.debugInfo("Timeout trying to connect to RS " + server +
           " for dn: " + baseDn);
@@ -1068,7 +1289,7 @@
       }
       if (error)
       {
-        serverInfo = null;
+        replServerInfo = null;
       } // Be sure to return null.
 
     }
@@ -1080,7 +1301,7 @@
       session = localSession;
     }
 
-    return serverInfo;
+    return replServerInfo;
   }
 
   /**
@@ -1126,11 +1347,11 @@
 
       // Send our start msg.
       ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
-          baseDn, 0, 0, 0, 0,
-          maxRcvWindow, heartbeatInterval, state,
-          ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
-          isSslEncryption,
-          groupId);
+        baseDn, 0, 0, 0, 0,
+        maxRcvWindow, heartbeatInterval, state,
+        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+        isSslEncryption,
+        groupId);
       localSession.publish(serverStartECLMsg);
 
       // Read the ReplServerStartMsg that should come back.
@@ -1189,7 +1410,7 @@
       error = true;
     } catch (Exception e)
     {
-      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+      if ((e instanceof SocketTimeoutException) && debugEnabled())
       {
         TRACER.debugInfo("Timeout trying to connect to RS " + server +
           " for dn: " + baseDn);
@@ -1282,7 +1503,7 @@
       {
         TRACER.debugInfo("In RB for " + baseDn +
           "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
-      //  +   "\nAND RECEIVED:\n" + topologyMsg.toString());
+        //  +   "\nAND RECEIVED:\n" + topologyMsg.toString());
       }
 
       // Alright set the timeout to the desired value
@@ -1340,13 +1561,13 @@
       {
         startSessionMsg =
           new StartSessionMsg(
-              initStatus,
-              domain.getRefUrls(),
-              domain.isAssured(),
-              domain.getAssuredMode(),
-              domain.getAssuredSdLevel());
+          initStatus,
+          domain.getRefUrls(),
+          domain.isAssured(),
+          domain.getAssuredMode(),
+          domain.getAssuredSdLevel());
         startSessionMsg.setEclIncludes(
-            domain.getEclInclude());
+          domain.getEclInclude());
       } else
       {
         startSessionMsg =
@@ -1395,269 +1616,560 @@
 
   /**
    * Returns the replication server that best fits our need so that we can
-   * connect to it.
-   * This methods performs some filtering on the group id, then call
-   * the real search for best server algorithm (searchForBestReplicationServer).
+   * connect to it or determine if we must disconnect from current one to
+   * re-connect to best server.
    *
-   * Note: this method put as public static for unit testing purpose.
+   * Note: this method is static for test purpose (access from unit tests)
    *
+   * @param firstConnection True if we run this method for the very first
+   * connection of the broker. False if we run this method to determine if the
+   * replication server we are currently connected to is still the best or not.
+   * @param rsServerId The id of the replication server we are currently
+   * connected to. Only used when firstConnection is false.
    * @param myState The local server state.
    * @param rsInfos The list of available replication servers and their
-   *                 associated information (choice will be made among them).
+   * associated information (choice will be made among them).
    * @param localServerId The server id for the suffix we are working for.
    * @param baseDn The suffix for which we are working for.
    * @param groupId The groupId we prefer being connected to if possible
-   * @return The computed best replication server.
+   * @param generationId The generation id we are using
+   * @return The computed best replication server. If the returned value is
+   * null, the best replication server is undetermined but the local server must
+   * disconnect (so the best replication server is another one than the current
+   * one). Null can only be returned when firstConnection is false.
    */
-  public static String computeBestReplicationServer(ServerState myState,
-    Map<String, ServerInfo> rsInfos, int localServerId,
-    String baseDn, byte groupId)
+  public static ReplicationServerInfo computeBestReplicationServer(
+    boolean firstConnection, int rsServerId, ServerState myState,
+    Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
+    String baseDn, byte groupId, long generationId)
   {
-    /*
-     * Preference is given to servers with the requested group id:
-     * If there are some servers with the requested group id in the provided
-     * server list, then we run the search algorithm only on them. If no server
-     * with the requested group id, consider all of them.
-     */
-
-    // Filter for servers with same group id
-    Map<String, ServerInfo> sameGroupIdRsInfos =
-      new HashMap<String, ServerInfo>();
-
-    for (String repServer : rsInfos.keySet())
-    {
-      ServerInfo serverInfo = rsInfos.get(repServer);
-      if (serverInfo.getGroupId() == groupId)
-        sameGroupIdRsInfos.put(repServer, serverInfo);
-    }
-
-    // Some servers with same group id ?
-    if (sameGroupIdRsInfos.size() > 0)
-    {
-      return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
-        localServerId, baseDn);
-    } else
-    {
-      return searchForBestReplicationServer(myState, rsInfos,
-        localServerId, baseDn);
-    }
-  }
-
-  /**
-   * Returns the replication server that best fits our need so that we can
-   * connect to it.
-   *
-   * Note: this method put as public static for unit testing purpose.
-   *
-   * @param myState The local server state.
-   * @param rsInfos The list of available replication servers and their
-   *                 associated information (choice will be made among them).
-   * @param localServerID The server id for the suffix we are working for.
-   * @param baseDn The suffix for which we are working for.
-   * @return The computed best replication server.
-   */
-  private static String searchForBestReplicationServer(ServerState myState,
-    Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
-  {
-    /*
-     * Find replication servers who are up to date (or more up to date than us,
-     * if for instance we failed and restarted, having sent some changes to the
-     * RS but without having time to store our own state) regarding our own
-     * server id. Then, among them, choose the server that is the most up to
-     * date regarding the whole topology.
-     *
-     * If no server is up to date regarding our own server id, find the one who
-     * is the most up to date regarding our server id.
-     */
-
-    // Should never happen (sanity check)
-    if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) ||
-      (baseDn == null))
-    {
-      return null;
-    }
 
     // Shortcut, if only one server, this is the best
     if (rsInfos.size() == 1)
     {
-      for (String repServer : rsInfos.keySet())
-        return repServer;
+      return rsInfos.values().iterator().next();
     }
 
-    String bestServer = null;
-    boolean bestServerIsLocal = false;
-
-    // Servers up to dates with regard to our changes
-    HashMap<String, ServerState> upToDateServers =
-      new HashMap<String, ServerState>();
-    // Servers late with regard to our changes
-    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
-
-    /*
-     * Start loop to differentiate up to date servers from late ones.
+    /**
+     * Apply some filtering criteria to determine the best servers list from
+     * the available ones. The ordered list of criteria is (from more important
+     * to less important):
+     * - replication server has the same group id as the local DS one
+     * - replication server has the same generation id as the local DS one
+     * - replication server is up to date regarding changes generated by the
+     *   local DS
+     * - replication server in the same VM as local DS one
      */
-    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
-    if (myChangeNumber == null)
+    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
+    Map<Integer, ReplicationServerInfo> newBestServers;
+    // The list of best replication servers is filtered with each criteria. At
+    // each criteria, the list is replaced with the filtered one if some there
+    // are some servers from the filtering, otherwise, the list is left as is
+    // and the new filtering for the next criteria is applied and so on.
+    for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
     {
-      myChangeNumber = new ChangeNumber(0, 0, localServerID);
-    }
-    for (String repServer : rsInfos.keySet())
-    {
-
-      ServerState rsState = rsInfos.get(repServer).getServerState();
-      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
-      if (rsChangeNumber == null)
+      newBestServers = null;
+      switch (filterLevel)
       {
-        rsChangeNumber = new ChangeNumber(0, 0, localServerID);
+        case 1:
+          // Use only servers locally configured: those are servers declared in
+          // the local configuration. When the current method is called, for
+          // sure, at least one server from the list is locally configured
+          bestServers = filterServersLocallyConfigured(bestServers);
+          break;
+        case 2:
+          // Some servers with same group id ?
+          newBestServers = filterServersWithSameGroupId(bestServers, groupId);
+          if (newBestServers.size() > 0)
+          {
+            bestServers = newBestServers;
+          }
+          break;
+        case 3:
+          // Some servers with same generation id ?
+          newBestServers = filterServersWithSameGenerationId(bestServers,
+            generationId);
+          if (newBestServers.size() > 0)
+          {
+            // Ok some servers with the right generation id
+            bestServers = newBestServers;
+            // If some servers with the right generation id this is useful to
+            // run the local DS change criteria
+            newBestServers = filterServersWithAllLocalDSChanges(bestServers,
+              myState, localServerId);
+            if (newBestServers.size() > 0)
+            {
+              bestServers = newBestServers;
+            }
+          }
+          break;
+        case 4:
+          // Some servers in the local VM ?
+          newBestServers = filterServersInSameVM(bestServers);
+          if (newBestServers.size() > 0)
+          {
+            bestServers = newBestServers;
+          }
+          break;
       }
+    }
 
-      // Store state in right list
-      if (myChangeNumber.olderOrEqual(rsChangeNumber))
+    /**
+     * Now apply the choice base on the weight to the best servers list
+     */
+    if (bestServers.size() > 1)
+    {
+      if (firstConnection)
       {
-        upToDateServers.put(repServer, rsState);
+        // We are no connected to a server yet
+        return computeBestServerForWeight(bestServers, -1, -1);
       } else
       {
-        lateOnes.put(repServer, rsState);
+        // We are already connected to a RS: compute the best RS as far as the
+        // weights is concerned. If this is another one, some DS must
+        // disconnect.
+        return computeBestServerForWeight(bestServers, rsServerId,
+          localServerId);
       }
-    }
-
-    if (upToDateServers.size() > 0)
-    {
-      /*
-       * Some up to date servers, among them, choose either :
-       * - The local one
-       * - The one that has the maximum number of changes to send us.
-       *   This is the most up to date one regarding the whole topology.
-       *   This server is the one which has the less
-       *   difference with the topology server state.
-       *   For comparison, we need to compute the difference for each
-       *   server id with the topology server state.
-       */
-
-      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
-        upToDateServers.size(), baseDn, Integer.toString(localServerID));
-      logError(message);
-
-      /*
-       * First of all, compute the virtual server state for the whole topology,
-       * which is composed of the most up to date change numbers for
-       * each server id in the topology.
-       */
-      ServerState topoState = new ServerState();
-      for (ServerState curState : upToDateServers.values())
-      {
-
-        Iterator<Integer> it = curState.iterator();
-        while (it.hasNext())
-        {
-          Integer sId = it.next();
-          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
-          if (curSidCn == null)
-          {
-            curSidCn = new ChangeNumber(0, 0, sId);
-          }
-          // Update topology state
-          topoState.update(curSidCn);
-        }
-      } // For up to date servers
-
-      // Min of the max shifts
-      long minShift = -1L;
-      for (String upServer : upToDateServers.keySet())
-      {
-
-        /*
-         * Compute the maximum difference between the time of a server id's
-         * change number and the time of the matching server id's change
-         * number in the topology server state.
-         *
-         * Note: we could have used the sequence number here instead of the
-         * timestamp, but this would have caused a problem when the sequence
-         * number loops and comes back to 0 (computation would have becomen
-         * meaningless).
-         */
-        long shift = 0;
-        ServerState curState = upToDateServers.get(upServer);
-        Iterator<Integer> it = curState.iterator();
-        while (it.hasNext())
-        {
-          Integer sId = it.next();
-          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
-          if (curSidCn == null)
-          {
-            curSidCn = new ChangeNumber(0, 0, sId);
-          }
-          // Cannot be null as checked at construction time
-          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
-          // Cannot be negative as topoState computed as being the max CN
-          // for each server id in the topology
-          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
-          shift +=tmpShift;
-        }
-
-        boolean upServerIsLocal =
-          ReplicationServer.isLocalReplicationServer(upServer);
-        if ((minShift < 0) // First time in loop
-            || ((shift < minShift) && upServerIsLocal)
-            || (((bestServerIsLocal == false) && (shift < minShift)))
-            || ((bestServerIsLocal == false) && (upServerIsLocal &&
-                                              (shift<(minShift + 60)) ))
-            || (shift+120 < minShift))
-        {
-          // This server is even closer to topo state
-          bestServer = upServer;
-          bestServerIsLocal = upServerIsLocal;
-          minShift = shift;
-        }
-      } // For up to date servers
-
     } else
     {
-      /*
-       * We could not find a replication server that has seen all the
-       * changes that this server has already processed,
-       */
-      // lateOnes cannot be empty
-      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
-        baseDn, lateOnes.size());
-      logError(message);
-
-      // Min of the shifts
-      long minShift = -1L;
-      for (String lateServer : lateOnes.keySet())
-      {
-        /*
-         * Choose the server who is the closest to us regarding our server id
-         * (this is the most up to date regarding our server id).
-         */
-        ServerState curState = lateOnes.get(lateServer);
-        ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
-        if (ourSidCn == null)
-        {
-          ourSidCn = new ChangeNumber(0, 0, localServerID);
-        }
-        // Cannot be negative as our Cn for our server id is strictly
-        // greater than those of the servers in late server list
-        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
-
-        boolean lateServerisLocal =
-          ReplicationServer.isLocalReplicationServer(lateServer);
-        if ((minShift < 0) // First time in loop
-          || ((tmpShift < minShift) && lateServerisLocal)
-          || (((bestServerIsLocal == false) && (tmpShift < minShift)))
-          || ((bestServerIsLocal == false) && (lateServerisLocal &&
-                                            (tmpShift<(minShift + 60)) ))
-          || (tmpShift+120 < minShift))
-        {
-          // This server is even closer to topo state
-          bestServer = lateServer;
-          bestServerIsLocal = lateServerisLocal;
-          minShift = tmpShift;
-        }
-      } // For late servers
-
+      return bestServers.values().iterator().next();
     }
-    return bestServer;
+  }
+
+  /**
+   * Creates a new list that contains only replication servers that are locally
+   * configured.
+   * @param bestServers The list of replication servers to filter
+   * @return The sub list of replication servers locally configured
+   */
+  private static Map<Integer, ReplicationServerInfo>
+    filterServersLocallyConfigured(Map<Integer,
+    ReplicationServerInfo> bestServers)
+  {
+    Map<Integer, ReplicationServerInfo> result =
+      new HashMap<Integer, ReplicationServerInfo>();
+
+    for (Integer rsId : bestServers.keySet())
+    {
+      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+      if (replicationServerInfo.isLocallyConfigured())
+      {
+        result.put(rsId, replicationServerInfo);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Creates a new list that contains only replication servers that have the
+   * passed group id, from a passed replication server list.
+   * @param bestServers The list of replication servers to filter
+   * @param groupId The group id that must match
+   * @return The sub list of replication servers matching the requested group id
+   * (which may be empty)
+   */
+  private static Map<Integer, ReplicationServerInfo>
+    filterServersWithSameGroupId(Map<Integer,
+    ReplicationServerInfo> bestServers, byte groupId)
+  {
+    Map<Integer, ReplicationServerInfo> result =
+      new HashMap<Integer, ReplicationServerInfo>();
+
+    for (Integer rsId : bestServers.keySet())
+    {
+      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+      if (replicationServerInfo.getGroupId() == groupId)
+      {
+        result.put(rsId, replicationServerInfo);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Creates a new list that contains only replication servers that have the
+   * passed generation id, from a passed replication server list.
+   * @param bestServers The list of replication servers to filter
+   * @param generationId The generation id that must match
+   * @return The sub list of replication servers matching the requested
+   * generation id (which may be empty)
+   */
+  private static Map<Integer, ReplicationServerInfo>
+    filterServersWithSameGenerationId(Map<Integer,
+    ReplicationServerInfo> bestServers, long generationId)
+  {
+    Map<Integer, ReplicationServerInfo> result =
+      new HashMap<Integer, ReplicationServerInfo>();
+
+    for (Integer rsId : bestServers.keySet())
+    {
+      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+      if (replicationServerInfo.getGenerationId() == generationId)
+      {
+        result.put(rsId, replicationServerInfo);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Creates a new list that contains only replication servers that have the
+   * latest changes from the passed DS, from a passed replication server list.
+   * @param bestServers The list of replication servers to filter
+   * @param localState The state of the local DS
+   * @param localServerId The server id to consider for the changes
+   * @return The sub list of replication servers that have the latest changes
+   * from the passed DS (which may be empty)
+   */
+  private static Map<Integer, ReplicationServerInfo>
+    filterServersWithAllLocalDSChanges(Map<Integer,
+    ReplicationServerInfo> bestServers, ServerState localState,
+    int localServerId)
+  {
+    Map<Integer, ReplicationServerInfo> upToDateServers =
+      new HashMap<Integer, ReplicationServerInfo>();
+    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
+      new HashMap<Integer, ReplicationServerInfo>();
+
+    // Extract the change number of the latest change generated by the local
+    // server
+    ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId);
+    if (myChangeNumber == null)
+    {
+      myChangeNumber = new ChangeNumber(0, 0, localServerId);
+    }
+
+    /**
+     * Find replication servers who are up to date (or more up to date than us,
+     * if for instance we failed and restarted, having sent some changes to the
+     * RS but without having time to store our own state) regarding our own
+     * server id. If some servers more up to date, prefer this list.
+     */
+    for (Integer rsId : bestServers.keySet())
+    {
+      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+      ServerState rsState = replicationServerInfo.getServerState();
+      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId);
+      if (rsChangeNumber == null)
+      {
+        rsChangeNumber = new ChangeNumber(0, 0, localServerId);
+      }
+
+      // Has this replication server the latest local change ?
+      if (myChangeNumber.olderOrEqual(rsChangeNumber))
+      {
+        if (myChangeNumber.equals(rsChangeNumber))
+        {
+          // This replication server has exactly the latest change from the
+          // local server
+          upToDateServers.put(rsId, replicationServerInfo);
+        } else
+        {
+          // This replication server is even more up to date than the local
+          // server
+          moreUpToDateServers.put(rsId, replicationServerInfo);
+        }
+      }
+    }
+    if (moreUpToDateServers.size() > 0)
+    {
+      // Prefer servers more up to date than local server
+      return moreUpToDateServers;
+    } else
+    {
+      return upToDateServers;
+    }
+  }
+
+  /**
+   * Creates a new list that contains only replication servers that are in the
+   * same VM as the local DS, from a passed replication server list.
+   * @param bestServers The list of replication servers to filter
+   * @return The sub list of replication servers being in the same VM as the
+   * local DS (which may be empty)
+   */
+  private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
+    Map<Integer, ReplicationServerInfo> bestServers)
+  {
+    Map<Integer, ReplicationServerInfo> result =
+      new HashMap<Integer, ReplicationServerInfo>();
+
+    for (Integer rsId : bestServers.keySet())
+    {
+      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+      if (ReplicationServer.isLocalReplicationServer(
+        replicationServerInfo.getServerURL()))
+      {
+        result.put(rsId, replicationServerInfo);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Computes the best replication server the local server should be connected
+   * to so that the load is correctly spread across the topology, following the
+   * weights guidance.
+   * Warning: This method is expected to be called with at least 2 servers in
+   * bestServers
+   * Note: this method is static for test purpose (access from unit tests)
+   * @param bestServers The list of replication servers to consider
+   * @param currentRsServerId The replication server the local server is
+   *        currently connected to. -1 if the local server is not yet connected
+   *        to any replication server.
+   * @param localServerId The server id of the local server. This is not used
+   *        when it is not connected to a replication server
+   *        (currentRsServerId = -1)
+   * @return The replication server the local server should be connected to
+   * as far as the weight is concerned. This may be the currently used one if
+   * the weight is correctly spread. If the returned value is null, the best
+   * replication server is undetermined but the local server must disconnect
+   * (so the best replication server is another one than the current one).
+   */
+  public static ReplicationServerInfo computeBestServerForWeight(
+    Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
+    int localServerId)
+  {
+    /*
+     * - Compute the load goal of each RS, deducing it from the weights affected
+     * to them.
+     * - Compute the current load of each RS, deducing it from the DSs
+     * currently connected to them.
+     * - Compute the differences between the load goals and the current loads of
+     * the RSs.
+     */
+    // Sum of the weights
+    int sumOfWeights = 0;
+    // Sum of the connected DSs
+    int sumOfConnectedDSs = 0;
+    for (ReplicationServerInfo replicationServerInfo : bestServers.values())
+    {
+      sumOfWeights += replicationServerInfo.getWeight();
+      sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
+    }
+    // Distance (difference) of the current loads to the load goals of each RS:
+    // key:server id, value: distance
+    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
+    // Precision for the operations (number of digits after the dot)
+    // Default value of rounding method is HALF_UP for
+    // the MathContext
+    MathContext mathContext = new MathContext(32);
+    for (Integer rsId : bestServers.keySet())
+    {
+      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+
+      int rsWeight = replicationServerInfo.getWeight();
+      //  load goal = rs weight / sum of weights
+      BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
+        new BigDecimal(sumOfWeights), mathContext);
+      BigDecimal currentLoadBd = BigDecimal.ZERO;
+      if (sumOfConnectedDSs != 0)
+      {
+        // current load = number of connected DSs / total number of DSs
+        int connectedDSs = replicationServerInfo.getConnectedDSNumber();
+        currentLoadBd = (new BigDecimal(connectedDSs)).divide(
+        new BigDecimal(sumOfConnectedDSs), mathContext);
+      }
+      // load distance = load goal - current load
+      BigDecimal loadDistanceBd =
+        loadGoalBd.subtract(currentLoadBd, mathContext);
+      loadDistances.put(rsId, loadDistanceBd);
+    }
+
+    if (currentRsServerId == -1)
+    {
+      // The local server is not connected yet
+
+      /*
+       * Find the server with the current highest distance to its load goal and
+       * choose it. Make an exception if every server is correctly balanced,
+       * that is every current load distances are equal to 0, in that case,
+       * choose the server with the highest weight
+       */
+      int bestRsId = 0; // If all server equal, return the first one
+      float highestDistance = Float.NEGATIVE_INFINITY;
+      boolean allRsWithZeroDistance = true;
+      int highestWeightRsId = -1;
+      int highestWeight = -1;
+      for (Integer rsId : bestServers.keySet())
+      {
+        float loadDistance = loadDistances.get(rsId).floatValue();
+        if (loadDistance > highestDistance)
+        {
+          // This server is far more from its balance point
+          bestRsId = rsId;
+          highestDistance = loadDistance;
+        }
+        if (loadDistance != (float)0)
+        {
+          allRsWithZeroDistance = false;
+        }
+        int weight = bestServers.get(rsId).getWeight();
+        if (weight > highestWeight)
+        {
+          // This server has a higher weight
+          highestWeightRsId = rsId;
+          highestWeight = weight;
+        }
+      }
+      // All servers with a 0 distance ?
+      if (allRsWithZeroDistance)
+      {
+        // Choose server withe the highest weight
+        bestRsId = highestWeightRsId;
+      }
+      return bestServers.get(bestRsId);
+    } else
+    {
+      // The local server is currently connected to a RS, let's see if it must
+      // disconnect or not, taking the weights into account.
+
+      float currentLoadDistance =
+        loadDistances.get(currentRsServerId).floatValue();
+      if (currentLoadDistance < (float) 0)
+      {
+        // Too much DSs connected to the current RS, compared with its load
+        // goal:
+        // Determine the potential number of DSs to disconnect from the current
+        // RS and see if the local DS is part of them: the DSs that must
+        // disconnect are those with the lowest server id.
+        // Compute the sum of the distances of the load goals of the other RSs
+        BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
+        for (Integer rsId : bestServers.keySet())
+        {
+          if (rsId != currentRsServerId)
+          {
+            sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
+              loadDistances.get(rsId), mathContext);
+          }
+        }
+
+        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
+        {
+          // The average distance of the other RSs shows a lack of DSs.
+          // Compute the number of DSs to disconnect from the current RS,
+          // rounding to the nearest integer number. Do only this if there is
+          // no risk of yoyo effect: when the exact balance cannot be
+          // established due to the current number of DSs connected, do not
+          // disconnect a DS. A simple example where the balance cannot be
+          // reached is:
+          // - RS1 has weight 1 and 2 DSs
+          // - RS2 has weight 1 and 1 DS
+          // => disconnecting a DS from RS1 to reconnect it to RS2 would have no
+          // sense as this would lead to the reverse situation. In that case,
+          // the perfect balance cannot be reached and we must stick to the
+          // current situation, otherwise the DS would keep move between the 2
+          // RSs
+          float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
+            multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
+            floatValue();
+          int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
+
+          // Avoid yoyo effect
+          if (overloadingDSsNumber == 1)
+          {
+            // What would be the new load distance for the current RS if
+            // we disconnect some DSs ?
+            ReplicationServerInfo currentReplicationServerInfo =
+              bestServers.get(currentRsServerId);
+
+            int currentRsWeight = currentReplicationServerInfo.getWeight();
+            BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
+            BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
+            BigDecimal currentRsLoadGoalBd =
+              currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
+            BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
+            if (sumOfConnectedDSs != 0)
+            {
+              int connectedDSs = currentReplicationServerInfo.
+                getConnectedDSNumber();
+              BigDecimal potentialNewConnectedDSsBd =
+                new BigDecimal(connectedDSs - 1);
+              BigDecimal sumOfConnectedDSsBd =
+                new BigDecimal(sumOfConnectedDSs);
+              potentialCurrentRsNewLoadBd =
+                potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
+                mathContext);
+            }
+            BigDecimal potentialCurrentRsNewLoadDistanceBd =
+              currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
+              mathContext);
+
+            // What would be the new load distance for the other RSs ?
+            BigDecimal additionalDsLoadBd =
+              (new BigDecimal(1)).divide(
+              new BigDecimal(sumOfConnectedDSs), mathContext);
+            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
+              sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
+              mathContext);
+
+            // Now compare both values: we must no disconnect the DS if this
+            // is for going in a situation where the load distance of the other
+            // RSs is the opposite of the future load distance of the local RS
+            // or we would evaluate that we should disconnect just after being
+            // arrived on the new RS. But we should disconnect if we reach the
+            // perfect balance (both values are 0).
+            MathContext roundMc =
+              new MathContext(6, RoundingMode.DOWN);
+            BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
+              potentialCurrentRsNewLoadDistanceBd.round(roundMc);
+            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
+              potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
+
+            if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
+              BigDecimal.ZERO) != 0)
+              && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
+              potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
+            {
+              // Avoid the yoyo effect, and keep the local DS connected to its
+              // current RS
+              return bestServers.get(currentRsServerId);
+            }
+          }
+
+          // Prepare a sorted list (from lowest to highest) or DS server ids
+          // connected to the current RS
+          ReplicationServerInfo currentRsInfo =
+            bestServers.get(currentRsServerId);
+          List<Integer> serversConnectedToCurrentRS =
+            currentRsInfo.getConnectedDSs();
+          List<Integer> sortedServers = new ArrayList<Integer>(
+            serversConnectedToCurrentRS);
+          Collections.sort(sortedServers);
+
+          // Go through the list of DSs to disconnect and see if the local
+          // server is part of them.
+          int index = 0;
+          while (overloadingDSsNumber > 0)
+          {
+            int severToDisconnectId = sortedServers.get(index);
+            if (severToDisconnectId == localServerId)
+            {
+              // The local server is part of the DSs to disconnect
+              return null;
+            }
+            overloadingDSsNumber--;
+            index++;
+          }
+
+          // The local server is not part of the servers to disconnect from the
+          // current RS.
+          return bestServers.get(currentRsServerId);
+        } else
+        {
+          // The average distance of the other RSs does not show a lack of DSs:
+          // no need to disconnect any DS from the current RS.
+          return bestServers.get(currentRsServerId);
+        }
+      } else
+      {
+        // The RS load goal is reached or there are not enough DSs connected to
+        // it to reach it: do not disconnect from this RS and return rsInfo for
+        // this RS
+        return bestServers.get(currentRsServerId);
+      }
+    }
   }
 
   /**
@@ -1679,28 +2191,6 @@
   }
 
   /**
-   * Starts the same group id poller.
-   */
-  private void startSameGroupIdPoller()
-  {
-    sameGroupIdPoller = new SameGroupIdPoller();
-    sameGroupIdPoller.start();
-  }
-
-  /**
-   * Stops the same group id poller.
-   */
-  private synchronized void stopSameGroupIdPoller()
-  {
-    if (sameGroupIdPoller != null)
-    {
-      sameGroupIdPoller.shutdown();
-      sameGroupIdPoller.waitForShutdown();
-      sameGroupIdPoller = null;
-    }
-  }
-
-  /**
    * Stop the heartbeat monitor thread.
    */
   synchronized void stopRSHeartBeatMonitoring()
@@ -1926,7 +2416,8 @@
    * Receive a message.
    * This method is not multithread safe and should either always be
    * called in a single thread or protected by a locking mechanism
-   * before being called.
+   * before being called. This is a wrapper to the method with a boolean version
+   * so that we do not have to modify existing tests.
    *
    * @return the received message
    * @throws SocketTimeoutException if the timeout set by setSoTimeout
@@ -1934,6 +2425,26 @@
    */
   public ReplicationMsg receive() throws SocketTimeoutException
   {
+    return receive(false);
+  }
+
+  /**
+   * Receive a message.
+   * This method is not multithread safe and should either always be
+   * called in a single thread or protected by a locking mechanism
+   * before being called.
+   *
+   * @return the received message
+   * @throws SocketTimeoutException if the timeout set by setSoTimeout
+   *         has expired
+   * @param allowReconnectionMechanism If true, this allows the reconnection
+   * mechanism to disconnect the broker if it detects that it should reconnect
+   * to another replication server because of some criteria defined by the
+   * algorithm where we choose a suitable replication server.
+   */
+  public ReplicationMsg receive(boolean allowReconnectionMechanism)
+    throws SocketTimeoutException
+  {
     while (shutdown == false)
     {
       if (!connected)
@@ -1956,13 +2467,16 @@
         {
           WindowMsg windowMsg = (WindowMsg) msg;
           sendWindow.release(windowMsg.getNumAck());
-        }
-        else if (msg instanceof TopologyMsg)
+        } else if (msg instanceof TopologyMsg)
         {
-          TopologyMsg topoMsg = (TopologyMsg)msg;
+          TopologyMsg topoMsg = (TopologyMsg) msg;
           receiveTopo(topoMsg);
-        }
-        else if (msg instanceof StopMsg)
+          if (allowReconnectionMechanism)
+          {
+            // Reset wait time before next computation of best server
+            mustRunBestServerCheckingAlgorithm = 0;
+          }
+        } else if (msg instanceof StopMsg)
         {
           /*
            * RS performs a proper disconnection
@@ -1974,8 +2488,7 @@
           logError(message);
           // Try to find a suitable RS
           this.reStart(failingSession);
-        }
-        else if (msg instanceof MonitorMsg)
+        } else if (msg instanceof MonitorMsg)
         {
           // This is the response to a MonitorRequest that was sent earlier or
           // the regular message of the monitoring publisher of the RS.
@@ -1997,16 +2510,53 @@
             monitorResponse.notify();
           }
 
-          // Extract and store replication servers ServerStates
-          rsStates = new HashMap<Integer, ServerState>();
+          // Update the replication servers ServerStates with new received info
           it = monitorMsg.rsIterator();
           while (it.hasNext())
           {
             int srvId = it.next();
-            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
+            if (rsInfo != null)
+            {
+              rsInfo.update(monitorMsg.getRSServerState(srvId));
+            }
           }
-        }
-        else
+
+          // Now if it is allowed, compute the best replication server to see if
+          // it is still the one we are currently connected to. If not,
+          // disconnect properly and let the connection algorithm re-connect to
+          // best replication server
+          if (allowReconnectionMechanism)
+          {
+            mustRunBestServerCheckingAlgorithm++;
+            if (mustRunBestServerCheckingAlgorithm == 2)
+            {
+              // Stable topology (no topo msg since few seconds): proceed with
+              // best server checking.
+              ReplicationServerInfo bestServerInfo =
+                computeBestReplicationServer(false, rsServerId, state,
+                replicationServerInfos, serverId, baseDn, groupId,
+                generationID);
+
+              if ((bestServerInfo == null) ||
+                (bestServerInfo.getServerId() != rsServerId))
+              {
+                // The best replication server is no more the one we are
+                // currently using. Disconnect properly then reconnect.
+                Message message =
+                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
+                  Integer.toString(serverId),
+                  Integer.toString(rsServerId),
+                  rsServerUrl);
+                logError(message);
+                reStart();
+              }
+
+              // Reset wait time before next computation of best server
+              mustRunBestServerCheckingAlgorithm = 0;
+            }
+          }
+        } else
         {
           return msg;
         }
@@ -2018,15 +2568,14 @@
         if (shutdown == false)
         {
           if ((session == null) || (!session.closeInitiated()))
-
           {
             /*
              * We did not initiate the close on our side, log an error message.
              */
             Message message =
               ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
-                  Integer.toString(rsServerId), baseDn.toString(),
-                  Integer.toString(serverId));
+              Integer.toString(rsServerId), baseDn.toString(),
+              Integer.toString(serverId));
             logError(message);
           }
           this.reStart(failingSession);
@@ -2066,7 +2615,8 @@
         }
       }
     } catch (InterruptedException e)
-    {}
+    {
+    }
     return replicaStates;
   }
 
@@ -2081,7 +2631,7 @@
   {
     try
     {
-      updateDoneCount ++;
+      updateDoneCount++;
       if ((updateDoneCount >= halfRcvWindow) && (session != null))
       {
         session.publish(new WindowMsg(updateDoneCount));
@@ -2103,10 +2653,9 @@
     if (debugEnabled())
     {
       debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
-        " close the connection to replication server " + rsServerId + " for"
-        + " domain " + baseDn);
+        " close the connection to replication server " + rsServerId + " for" +
+        " domain " + baseDn);
     }
-    stopSameGroupIdPoller();
     stopRSHeartBeatMonitoring();
     stopChangeTimeHeartBeatPublishing();
     replicationServer = "stopped";
@@ -2237,8 +2786,8 @@
    * @param groupId            The new group id to use
    */
   public boolean changeConfig(
-      Collection<String> replicationServers, int window, long heartbeatInterval,
-      byte groupId)
+    Collection<String> replicationServers, int window, long heartbeatInterval,
+    byte groupId)
   {
     // These parameters needs to be renegotiated with the ReplicationServer
     // so if they have changed, that requires restarting the session with
@@ -2248,11 +2797,11 @@
     // A new session is necessary only when information regarding
     // the connection is modified
     if ((servers == null) ||
-        (!(replicationServers.size() == servers.size()
-        && replicationServers.containsAll(servers))) ||
-        window != this.maxRcvWindow  ||
-        heartbeatInterval != this.heartbeatInterval ||
-        (groupId != this.groupId))
+      (!(replicationServers.size() == servers.size() && replicationServers.
+      containsAll(servers))) ||
+      window != this.maxRcvWindow ||
+      heartbeatInterval != this.heartbeatInterval ||
+      (groupId != this.groupId))
     {
       needToRestartSession = true;
     }
@@ -2313,152 +2862,6 @@
   }
 
   /**
-   * In case we are connected to a RS with a different group id, we use this
-   * thread to poll presence of a RS with the same group id as ours. If a RS
-   * with the same group id is available, we close the session to force
-   * reconnection. Reconnection will choose a server with the same group id.
-   */
-  private class SameGroupIdPoller extends DirectoryThread
-  {
-
-    private boolean sameGroupIdPollershutdown = false;
-    private boolean terminated = false;
-    // Sleep interval in ms
-    private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000;
-
-    public SameGroupIdPoller()
-    {
-      super("Replication Broker Same Group Id Poller for " + baseDn.toString() +
-        " and group id " + groupId + " in server id " + serverId);
-    }
-
-    /**
-     * Wait for the completion of the same group id poller.
-     */
-    public void waitForShutdown()
-    {
-      try
-      {
-        while (terminated == false)
-        {
-          Thread.sleep(50);
-        }
-      } catch (InterruptedException e)
-      {
-        // exit the loop if this thread is interrupted.
-      }
-    }
-
-    /**
-     * Shutdown the same group id poller.
-     */
-    public void shutdown()
-    {
-      sameGroupIdPollershutdown = true;
-    }
-
-    /**
-     * Permanently look for RS with our group id and if found, break current
-     * connection to force reconnection to a new server with the right group id.
-     */
-    @Override
-    public void run()
-    {
-      boolean done = false;
-
-      if (debugEnabled())
-      {
-        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
-          " started.");
-      }
-
-      while ((!done) && (!sameGroupIdPollershutdown))
-      {
-        // Sleep some time between checks
-        try
-        {
-          Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD);
-        } catch (InterruptedException e)
-        {
-          // Stop as we are interrupted
-          sameGroupIdPollershutdown = true;
-        }
-        synchronized (connectPhaseLock)
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugInfo("Running SameGroupIdPoller for: " +
-              baseDn.toString());
-          }
-          if (session != null) // Check only if not already disconnected
-
-          {
-            for (String server : servers)
-            {
-              // Do not ask the RS we are connected to as it has for sure the
-              // wrong group id
-              if (server.equals(rsServerUrl))
-                continue;
-
-              // Connect to server and get reply message
-              ServerInfo serverInfo =
-                performPhaseOneHandshake(server, false);
-
-              // Is it a server with our group id ?
-              if (serverInfo != null)
-              {
-                if (groupId == serverInfo.getGroupId())
-                {
-                  // Found one server with the same group id as us, disconnect
-                  // session to force reconnection to a server with same group
-                  // id.
-                  Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
-                    Byte.toString(groupId), baseDn.toString(),
-                    Integer.toString(serverId));
-                  logError(message);
-
-                  if (protocolVersion >=
-                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
-                  {
-                    // V4 protocol introduces a StopMsg to properly end
-                    // communications
-                    try
-                    {
-                      session.publish(new StopMsg());
-                    } catch (IOException ioe)
-                    {
-                      // Anyway, going to close session, so nothing to do
-                    }
-                  }
-                  try
-                  {
-                    session.close();
-                  } catch (Exception e)
-                  {
-                    // The session was already closed, just ignore.
-                  }
-                  session = null;
-                  done = true; // Terminates thread as did its job.
-
-                  break;
-                }
-              }
-            } // for server
-
-          }
-        }
-      }
-
-      terminated = true;
-      if (debugEnabled())
-      {
-        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
-          " terminated.");
-      }
-    }
-  }
-
-  /**
    * Signals the RS we just entered a new status.
    * @param newStatus The status the local DS just entered
    */
@@ -2505,7 +2908,42 @@
    */
   public List<RSInfo> getRsList()
   {
-    return rsList;
+    List<RSInfo> result = new ArrayList<RSInfo>();
+
+    for (ReplicationServerInfo replicationServerInfo :
+      replicationServerInfos.values())
+    {
+      result.add(replicationServerInfo.toRSInfo());
+    }
+    return result;
+  }
+
+  /**
+   * Computes the list of DSs connected to a particular RS.
+   * @param rsId The RS id of the server one wants to know the connected DSs
+   * @param dsList The list of DSinfo from which to compute things
+   * @return The list of connected DSs to the server rsId
+   */
+  private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
+  {
+    List<Integer> connectedDSs = new ArrayList<Integer>();
+
+    if (rsServerId == rsId)
+    {
+      // If we are computing connected DSs for the RS we are connected
+      // to, we should count the local DS as the DSInfo of the local DS is not
+      // sent by the replication server in the topology message. We must count
+      // ourself as a connected server.
+      connectedDSs.add(serverId);
+    }
+
+    for (DSInfo dsInfo : dsList)
+    {
+      if (dsInfo.getRsId() == rsId)
+        connectedDSs.add(dsInfo.getDsId());
+    }
+
+    return connectedDSs;
   }
 
   /**
@@ -2516,13 +2954,49 @@
    */
   public void receiveTopo(TopologyMsg topoMsg)
   {
-    // Store new lists
-    synchronized(getDsList())
+    // Store new DS list
+    dsList = topoMsg.getDsList();
+
+    // Update replication server info list with the received topology
+    // information
+    List<Integer> rsToKeepList = new ArrayList<Integer>();
+    for (RSInfo rsInfo : topoMsg.getRsList())
     {
-      synchronized(getRsList())
+      int rsId = rsInfo.getId();
+      rsToKeepList.add(rsId); // Mark this server as still existing
+      List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+      ReplicationServerInfo replicationServerInfo =
+        replicationServerInfos.get(rsId);
+      if (replicationServerInfo == null)
       {
-        dsList = topoMsg.getDsList();
-        rsList = topoMsg.getRsList();
+        // New replication server, create info for it add it to the list
+        replicationServerInfo =
+          new ReplicationServerInfo(rsInfo, connectedDSs);
+        // Set the locally configured flag for this new RS only if it is
+        // configured
+        updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
+        replicationServerInfos.put(rsId, replicationServerInfo);
+      } else
+      {
+        // Update the existing info for the replication server
+        replicationServerInfo.update(rsInfo, connectedDSs);
+      }
+    }
+
+    /**
+     * Now remove any replication server that may have disappeared from the
+     * topology.
+     */
+    Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
+      replicationServerInfos.entrySet().iterator();
+    while (rsInfoIt.hasNext())
+    {
+      Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
+      if (!rsToKeepList.contains(rsInfoEntry.getKey()))
+      {
+        // This replication server has quit the topology, remove it from the
+        // list
+        rsInfoIt.remove();
       }
     }
     if (domain != null)
@@ -2536,6 +3010,7 @@
       }
     }
   }
+
   /**
    * Check if the broker could not find any Replication Server and therefore
    * connection attempt failed.
@@ -2557,16 +3032,15 @@
     {
       ctHeartbeatPublisherThread =
         new CTHeartbeatPublisherThread(
-            "Replication CN Heartbeat sender for " +
-            baseDn + " with " + getReplicationServer(),
-            session, changeTimeHeartbeatSendInterval, serverId);
+        "Replication CN Heartbeat sender for " +
+        baseDn + " with " + getReplicationServer(),
+        session, changeTimeHeartbeatSendInterval, serverId);
       ctHeartbeatPublisherThread.start();
-    }
-    else
+    } else
     {
       if (debugEnabled())
         TRACER.debugInfo(this +
-          " is not configured to send CN heartbeat interval");
+        " is not configured to send CN heartbeat interval");
     }
   }
 

--
Gitblit v1.10.0