mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
19.53.2010 b4c27fccb2913620731a9296b04baccb69846ac7
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,12 +26,16 @@
 */
package org.opends.server.replication.service;
import java.net.UnknownHostException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -40,10 +44,12 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -51,7 +57,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
@@ -102,7 +107,7 @@
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow = 100;
  private int halfRcvWindow = rcvWindow/2;
  private int halfRcvWindow = rcvWindow / 2;
  private int maxRcvWindow = rcvWindow;
  private int timeout = 0;
  private short protocolVersion;
@@ -117,13 +122,11 @@
  private String rsServerUrl = null;
  // Our replication domain
  private ReplicationDomain domain = null;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled or the last
@@ -131,15 +134,6 @@
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  /**
   * A Map containing the ServerStates of all the replication servers in the
   * topology as seen by the ReplicationServer the last time it was polled or
   * the last time it published monitoring information.
   */
  private HashMap<Integer, ServerState> rsStates =
    new HashMap<Integer, ServerState>();
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
@@ -163,10 +157,6 @@
   */
  private boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  // Same group id poller thread
  private SameGroupIdPoller sameGroupIdPoller = null;
  /**
   * The thread that publishes messages to the RS containing the current
   * change time of this DS.
@@ -174,7 +164,7 @@
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
  /**
   * The expected period in milliseconds between these messages are sent
   * to the replication server.  Zero means heartbeats are off.
   * to the replication server. Zero means heartbeats are off.
   */
  private long changeTimeHeartbeatSendInterval = 0;
  /*
@@ -183,12 +173,30 @@
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  // Info for other RSs.
  private List<RSInfo> rsList = new ArrayList<RSInfo>();
  private long generationID;
  private int updateDoneCount = 0;
  private boolean connectRequiresRecovery = false;
  /**
   * The map of replication server info initialized at connection time and
   * regularly updated. This is used to decide to which best suitable
   * replication server one wants to connect.
   * Key: replication server id
   * Value: replication server info for the matching replication server id
   */
  private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
  /**
   * This integer defines when the best replication server checking algorithm
   * should be engaged.
   * Every time a monitoring message (each monitoring publisher period) is
   * received, it is incremented. When it reaches 2, we run the checking
   * algorithm to see if we must reconnect to another best replication server.
   * Then we reset the value to 0. But when a topology message is received, the
   * integer is reseted to 0. This insures that we wait at least one monitoring
   * publisher period before running the algorithm, but also that we wait at
   * least for a monitoring period after the last received topology message
   * (topology stabilization).
   */
  private int mustRunBestServerCheckingAlgorithm = 0;
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -228,7 +236,7 @@
    this.heartbeatInterval = heartbeatInterval;
    this.maxRcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window /2;
    this.halfRcvWindow = window / 2;
    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
  }
@@ -284,7 +292,7 @@
    return rsServerId;
  }
 /**
  /**
   * Gets the server id.
   * @return The server id
   */
@@ -300,9 +308,11 @@
  private long getGenerationID()
  {
    if (domain != null)
      return domain.getGenerationID();
    else
      return generationID;
    {
      // Update the generation id
      generationID = domain.getGenerationID();
    }
    return generationID;
  }
  /**
@@ -315,48 +325,167 @@
  }
  /**
   * Bag class for keeping info we get from a server in order to compute the
   * best one to connect to. This is in fact a wrapper to a
   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
   * Sets the locally configured flag for the passed ReplicationServerInfo
   * object, analyzing the local configuration.
   * @param
   */
  public static class ServerInfo
  private void updateRSInfoLocallyConfiguredStatus(
    ReplicationServerInfo replicationServerInfo)
  {
    // Determine if the passed ReplicationServerInfo has a URL that is present
    // in the locally configured replication servers
    String rsUrl = replicationServerInfo.getServerURL();
    if (rsUrl == null)
    {
      // The ReplicationServerInfo has been generated from a server with
      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
      // ignore this server as we do not know how to connect to it
      replicationServerInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : servers)
    {
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
        // This RS is locally configured, mark this
        replicationServerInfo.setLocallyConfigured(true);
        return;
      }
    }
    replicationServerInfo.setLocallyConfigured(false);
  }
  /**
   * Compares 2 replication servers addresses and returns true if they both
   * represent the same replication server instance.
   * @param rs1Url Replication server 1 address
   * @param rs2Url Replication server 2 address
   * @return True if both replication server addresses represent the same
   * replication server instance, false otherwise.
   */
  private static boolean isSameReplicationServerUrl(String rs1Url,
    String rs2Url)
  {
    // Get and compare ports of RS1 and RS2
    int separator1 = rs1Url.lastIndexOf(':');
    if (separator1 < 0)
    {
      // Not a RS url: should not happen
      return false;
    }
    int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1));
    int separator2 = rs2Url.lastIndexOf(':');
    if (separator2 < 0)
    {
      // Not a RS url: should not happen
      return false;
    }
    int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1));
    if (rs1Port != rs2Port)
    {
      return false;
    }
    // Get and compare addresses of RS1 and RS2
    String rs1 = rs1Url.substring(0, separator1);
    InetAddress[] rs1Addresses = null;
    try
    {
      if (rs1.equals("localhost") || rs1.equals("127.0.0.1"))
      {
        // Replace localhost with the local official hostname
        rs1 = InetAddress.getLocalHost().getHostName();
      }
      rs1Addresses = InetAddress.getAllByName(rs1);
    } catch (UnknownHostException ex)
    {
      // Unknown RS: should not happen
      return false;
    }
    String rs2 = rs2Url.substring(0, separator2);
    InetAddress[] rs2Addresses = null;
    try
    {
      if (rs2.equals("localhost") || rs2.equals("127.0.0.1"))
      {
        // Replace localhost with the local official hostname
        rs2 = InetAddress.getLocalHost().getHostName();
      }
      rs2Addresses = InetAddress.getAllByName(rs2);
    } catch (UnknownHostException ex)
    {
      // Unknown RS: should not happen
      return false;
    }
    // Now compare addresses, if at least one match, this is the same server
    for (int i = 0; i < rs1Addresses.length; i++)
    {
      InetAddress inetAddress1 = rs1Addresses[i];
      for (int j = 0; j < rs2Addresses.length; j++)
      {
        InetAddress inetAddress2 = rs2Addresses[j];
        if (inetAddress2.equals(inetAddress1))
        {
          return true;
        }
      }
    }
    return false;
  }
  /**
   * Bag class for keeping info we get from a replication server in order to
   * compute the best one to connect to. This is in fact a wrapper to a
   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
   * updated with a info coming from received topology messages or monitoring
   * messages.
   */
  public static class ReplicationServerInfo
  {
    private short protocolVersion;
    private long generationId;
    private byte groupId = (byte) -1;
    private int serverId;
    // Received server URL
    private String serverURL;
    private String baseDn = null;
    private int windowSize;
    private ServerState serverState;
    private ServerState serverState = null;
    private boolean sslEncryption;
    private int degradedStatusThreshold = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int weight = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int connectedDSNumber = -1;
    // Keeps the 1 value if created with a ReplServerStartMsg
    private int weight = 1;
    // Keeps the 0 value if created with a ReplServerStartMsg
    private int connectedDSNumber = 0;
    private List<Integer> connectedDSs = null;
    // Is this RS locally configured ? (the RS is recognized as a usable server)
    private boolean locallyConfigured = true;
    /**
     * Create a new instance of ServerInfo wrapping the passed message.
     * Create a new instance of ReplicationServerInfo wrapping the passed
     * message.
     * @param msg Message to wrap.
     * @return The new instance wrapping the passed message.
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public static ServerInfo newServerInfo(
    public static ReplicationServerInfo newInstance(
      ReplicationMsg msg) throws IllegalArgumentException
    {
      if (msg instanceof ReplServerStartMsg)
      {
        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
        return new ServerInfo(replServerStartMsg);
      }
      else if (msg instanceof ReplServerStartDSMsg)
        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
        return new ReplicationServerInfo(replServerStartMsg);
      } else if (msg instanceof ReplServerStartDSMsg)
      {
        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
        return new ServerInfo(replServerStartDSMsg);
        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
        return new ReplicationServerInfo(replServerStartDSMsg);
      }
      // Unsupported message type: should not happen
@@ -365,10 +494,10 @@
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
     * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
     */
    private ServerInfo(ReplServerStartMsg replServerStartMsg)
    private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
    {
      this.protocolVersion = replServerStartMsg.getVersion();
      this.generationId = replServerStartMsg.getGenerationId();
@@ -384,11 +513,12 @@
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
     * Constructs a ReplicationServerInfo object wrapping a
     * ReplServerStartDSMsg.
     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
     * wrap.
     */
    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    {
      this.protocolVersion = replServerStartDSMsg.getVersion();
      this.generationId = replServerStartDSMsg.getGenerationId();
@@ -514,16 +644,98 @@
    {
      return connectedDSNumber;
    }
    /**
     * Constructs a new replication server info with the passed RSInfo
     * internal values and the passed connected DSs.
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
    {
      this.serverId = rsInfo.getId();
      this.serverURL = rsInfo.getServerUrl();
      this.generationId = rsInfo.getGenerationId();
      this.groupId = rsInfo.getGroupId();
      this.weight = rsInfo.getWeight();
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
    }
    /**
     * Converts the object to a RSInfo object.
     * @return The RSInfo object matching this object.
     */
    public RSInfo toRSInfo()
    {
      return new RSInfo(serverId, serverURL, generationId, groupId, weight);
    }
    /**
     * Updates replication server info with the passed RSInfo internal values
     * and the passed connected DSs.
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public void update(RSInfo rsInfo, List<Integer> connectedDSs)
    {
      this.generationId = rsInfo.getGenerationId();
      this.groupId = rsInfo.getGroupId();
      this.weight = rsInfo.getWeight();
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
    }
    /**
     * Updates replication server info with the passed server state.
     * @param serverState The ServerState to use for the update
     */
    public void update(ServerState serverState)
    {
      if (this.serverState != null)
      {
        this.serverState.update(serverState);
      } else
      {
        this.serverState = serverState;
      }
    }
    /**
     * Get the getConnectedDSs.
     * @return the getConnectedDSs
     */
    public List<Integer> getConnectedDSs()
    {
      return connectedDSs;
    }
    /**
     * Gets the locally configured status for this RS.
     * @return the locallyConfigured
     */
    public boolean isLocallyConfigured()
    {
      return locallyConfigured;
    }
    /**
     * Sets the locally configured status for this RS.
     * @param locallyConfigured the locallyConfigured to set
     */
    public void setLocallyConfigured(boolean locallyConfigured)
    {
      this.locallyConfigured = locallyConfigured;
    }
  }
  private void connect()
  {
    if (this.baseDn.compareToIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
      ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
    {
      connectAsECL();
    }
    else
    } else
    {
      connectAsDataServer();
    }
@@ -534,19 +746,22 @@
   * able to choose the more suitable.
   * @return the collected information.
   */
  private Map<String, ServerInfo> collectReplicationServersInfo() {
  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
  {
    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    Map<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    for (String server : servers)
    {
      // Connect to server and get info about it
      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
      ReplicationServerInfo replicationServerInfo =
        performPhaseOneHandshake(server, false);
      // Store server info in list
      if (serverInfo != null)
      if (replicationServerInfo != null)
      {
        rsInfos.put(server, serverInfo);
        rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
      }
    }
@@ -558,7 +773,6 @@
   * are :
   * - 1 single RS configured
   * - so no choice of the preferred RS
   * - No same groupID polling
   * - ?? Heartbeat
   * - Start handshake is :
   *    Broker ---> StartECLMsg       ---> RS
@@ -570,10 +784,10 @@
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    ReplServerStartDSMsg inReplServerStartDSMsg
      = performECLPhaseOneHandshake(bestServer, true);
    ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
      bestServer, true);
    if (inReplServerStartDSMsg!=null)
    if (inReplServerStartDSMsg != null)
      performECLPhaseTwoHandshake(bestServer);
  }
@@ -589,7 +803,7 @@
   *
   * phase 1:
   * DS --- ServerStartMsg ---> RS
   * DS <--- ReplServerStartMsg --- RS
   * DS <--- ReplServerStartDSMsg --- RS
   * phase 2:
   * DS --- StartSessionMsg ---> RS
   * DS <--- TopologyMsg --- RS
@@ -615,9 +829,9 @@
    }
    // Stop any existing poller and heartbeat monitor from a previous session.
    stopSameGroupIdPoller();
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    mustRunBestServerCheckingAlgorithm = 0;
    boolean newServerWithSameGroupId = false;
    synchronized (connectPhaseLock)
@@ -628,41 +842,47 @@
       */
      if (debugEnabled())
        TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
            " order to elect the preferred one");
          " order to elect the preferred one");
      // Get info from every available replication servers
      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
      replicationServerInfos = collectReplicationServersInfo();
      ServerInfo serverInfo = null;
      ReplicationServerInfo replicationServerInfo = null;
      if (rsInfos.size() > 0)
      if (replicationServerInfos.size() > 0)
      {
        // At least one server answered, find the best one.
        String bestServer = computeBestReplicationServer(state, rsInfos,
          serverId, baseDn, groupId);
        replicationServerInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, baseDn, groupId,
          this.getGenerationID());
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
              "phase 2 : will perform PhaseOneH with the preferred RS.");
        serverInfo = performPhaseOneHandshake(bestServer, true);
            "phase 2 : will perform PhaseOneH with the preferred RS.");
        replicationServerInfo = performPhaseOneHandshake(
          replicationServerInfo.getServerURL(), true);
        // Update replication server info with potentially more up to date data
        // (server state for instance may have changed)
        replicationServerInfos.put(replicationServerInfo.getServerId(),
          replicationServerInfo);
        if (serverInfo != null) // Handshake phase 1 exchange went well
        if (replicationServerInfo != null)
        {
          // Handshake phase 1 exchange went well
          // Compute in which status we are starting the session to tell the RS
          ServerStatus initStatus =
            computeInitialServerStatus(serverInfo.getGenerationId(),
            serverInfo.getServerState(),
            serverInfo.getDegradedStatusThreshold(),
            computeInitialServerStatus(replicationServerInfo.getGenerationId(),
            replicationServerInfo.getServerState(),
            replicationServerInfo.getDegradedStatusThreshold(),
            this.getGenerationID());
          // Perfom session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
            initStatus);
          // Perform session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(
            replicationServerInfo.getServerURL(), initStatus);
          if (topologyMsg != null) // Handshake phase 2 exchange went well
          {
            try
            {
@@ -681,7 +901,7 @@
               * reconnection at that time to retrieve a server with our group
               * id.
               */
              byte tmpRsGroupId = serverInfo.getGroupId();
              byte tmpRsGroupId = replicationServerInfo.getGroupId();
              boolean someServersWithSameGroupId =
                hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -690,10 +910,10 @@
                ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
              {
                replicationServer = session.getReadableRemoteAddress();
                maxSendWindow = serverInfo.getWindowSize();
                rsGroupId = serverInfo.getGroupId();
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                maxSendWindow = replicationServerInfo.getWindowSize();
                rsGroupId = replicationServerInfo.getGroupId();
                rsServerId = replicationServerInfo.getServerId();
                rsServerUrl = replicationServerInfo.getServerURL();
                receiveTopo(topologyMsg);
@@ -715,27 +935,27 @@
                if (domain != null)
                {
                  domain.sessionInitiated(
                      initStatus, serverInfo.getServerState(),
                      serverInfo.getGenerationId(),
                      session);
                    initStatus, replicationServerInfo.getServerState(),
                    replicationServerInfo.getGenerationId(),
                    session);
                }
                if (getRsGroupId() != groupId)
                {
                 // Connected to replication server with wrong group id:
                 // warn user and start poller to recover when a server with
                 // right group id arrives...
                 Message message =
                   WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
                   Byte.toString(groupId),  Integer.toString(rsServerId),
                   bestServer, Byte.toString(getRsGroupId()),
                   baseDn.toString(), Integer.toString(serverId));
                 logError(message);
                 startSameGroupIdPoller();
                  // Connected to replication server with wrong group id:
                  // warn user and start poller to recover when a server with
                  // right group id arrives...
                  Message message =
                    WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
                    Byte.toString(groupId), Integer.toString(rsServerId),
                    replicationServerInfo.getServerURL(),
                    Byte.toString(getRsGroupId()),
                    baseDn.toString(), Integer.toString(serverId));
                  logError(message);
                }
                startRSHeartBeatMonitoring();
                if (serverInfo.getProtocolVersion()
                    >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
                if (replicationServerInfo.getProtocolVersion() >=
                  ProtocolVersion.REPLICATION_PROTOCOL_V3)
                {
                  startChangeTimeHeartBeatPublishing();
                }
@@ -753,7 +973,7 @@
            } catch (Exception e)
            {
              Message message = ERR_COMPUTING_FAKE_OPS.get(
                baseDn, bestServer,
                baseDn, replicationServerInfo.getServerURL(),
                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
              logError(message);
            } finally
@@ -783,8 +1003,9 @@
      {
        connectPhaseLock.notify();
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
          (serverInfo.getGenerationId() == -1))
        if ((replicationServerInfo.getGenerationId() ==
          this.getGenerationID()) ||
          (replicationServerInfo.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -801,7 +1022,7 @@
            baseDn.toString(),
            replicationServer,
            Long.toString(this.getGenerationID()),
            Long.toString(serverInfo.getGenerationId()));
            Long.toString(replicationServerInfo.getGenerationId()));
          logError(message);
        }
      } else
@@ -908,7 +1129,7 @@
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * server, wrapped in a ServerInfo object.
   * server, wrapped in a ReplicationServerInfo object.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
@@ -917,10 +1138,10 @@
   * @return The answer from the server . Null if could not
   *         get an answer.
   */
  private ServerInfo performPhaseOneHandshake(String server,
  private ReplicationServerInfo performPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ServerInfo serverInfo = null;
    ReplicationServerInfo replServerInfo = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
@@ -962,17 +1183,17 @@
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
        {
          TRACER.debugInfo("In RB for " + baseDn +
            "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
            "\nAND RECEIVED:\n" + msg.toString());
        }
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
          "\nAND RECEIVED:\n" + msg.toString());
      }
      // Wrap received message in a server info object
      serverInfo = ServerInfo.newServerInfo(msg);
      replServerInfo = ReplicationServerInfo.newInstance(msg);
      // Sanity check
      String repDn = serverInfo.getBaseDn();
      String repDn = replServerInfo.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -987,7 +1208,7 @@
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        serverInfo.getProtocolVersion());
        replServerInfo.getProtocolVersion());
      localSession.setProtocolVersion(protocolVersion);
@@ -1017,7 +1238,7 @@
      error = true;
    } catch (Exception e)
    {
      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
      if ((e instanceof SocketTimeoutException) && debugEnabled())
      {
        TRACER.debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn);
@@ -1068,7 +1289,7 @@
      }
      if (error)
      {
        serverInfo = null;
        replServerInfo = null;
      } // Be sure to return null.
    }
@@ -1080,7 +1301,7 @@
      session = localSession;
    }
    return serverInfo;
    return replServerInfo;
  }
  /**
@@ -1126,11 +1347,11 @@
      // Send our start msg.
      ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
          baseDn, 0, 0, 0, 0,
          maxRcvWindow, heartbeatInterval, state,
          ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
          isSslEncryption,
          groupId);
        baseDn, 0, 0, 0, 0,
        maxRcvWindow, heartbeatInterval, state,
        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
        isSslEncryption,
        groupId);
      localSession.publish(serverStartECLMsg);
      // Read the ReplServerStartMsg that should come back.
@@ -1189,7 +1410,7 @@
      error = true;
    } catch (Exception e)
    {
      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
      if ((e instanceof SocketTimeoutException) && debugEnabled())
      {
        TRACER.debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn);
@@ -1282,7 +1503,7 @@
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
      //  +   "\nAND RECEIVED:\n" + topologyMsg.toString());
        //  +   "\nAND RECEIVED:\n" + topologyMsg.toString());
      }
      // Alright set the timeout to the desired value
@@ -1340,13 +1561,13 @@
      {
        startSessionMsg =
          new StartSessionMsg(
              initStatus,
              domain.getRefUrls(),
              domain.isAssured(),
              domain.getAssuredMode(),
              domain.getAssuredSdLevel());
          initStatus,
          domain.getRefUrls(),
          domain.isAssured(),
          domain.getAssuredMode(),
          domain.getAssuredSdLevel());
        startSessionMsg.setEclIncludes(
            domain.getEclInclude());
          domain.getEclInclude());
      } else
      {
        startSessionMsg =
@@ -1395,269 +1616,560 @@
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   * This methods performs some filtering on the group id, then call
   * the real search for best server algorithm (searchForBestReplicationServer).
   * connect to it or determine if we must disconnect from current one to
   * re-connect to best server.
   *
   * Note: this method put as public static for unit testing purpose.
   * Note: this method is static for test purpose (access from unit tests)
   *
   * @param firstConnection True if we run this method for the very first
   * connection of the broker. False if we run this method to determine if the
   * replication server we are currently connected to is still the best or not.
   * @param rsServerId The id of the replication server we are currently
   * connected to. Only used when firstConnection is false.
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * associated information (choice will be made among them).
   * @param localServerId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @param groupId The groupId we prefer being connected to if possible
   * @return The computed best replication server.
   * @param generationId The generation id we are using
   * @return The computed best replication server. If the returned value is
   * null, the best replication server is undetermined but the local server must
   * disconnect (so the best replication server is another one than the current
   * one). Null can only be returned when firstConnection is false.
   */
  public static String computeBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int localServerId,
    String baseDn, byte groupId)
  public static ReplicationServerInfo computeBestReplicationServer(
    boolean firstConnection, int rsServerId, ServerState myState,
    Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
    String baseDn, byte groupId, long generationId)
  {
    /*
     * Preference is given to servers with the requested group id:
     * If there are some servers with the requested group id in the provided
     * server list, then we run the search algorithm only on them. If no server
     * with the requested group id, consider all of them.
     */
    // Filter for servers with same group id
    Map<String, ServerInfo> sameGroupIdRsInfos =
      new HashMap<String, ServerInfo>();
    for (String repServer : rsInfos.keySet())
    {
      ServerInfo serverInfo = rsInfos.get(repServer);
      if (serverInfo.getGroupId() == groupId)
        sameGroupIdRsInfos.put(repServer, serverInfo);
    }
    // Some servers with same group id ?
    if (sameGroupIdRsInfos.size() > 0)
    {
      return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
        localServerId, baseDn);
    } else
    {
      return searchForBestReplicationServer(myState, rsInfos,
        localServerId, baseDn);
    }
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   *
   * Note: this method put as public static for unit testing purpose.
   *
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param localServerID The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. Then, among them, choose the server that is the most up to
     * date regarding the whole topology.
     *
     * If no server is up to date regarding our own server id, find the one who
     * is the most up to date regarding our server id.
     */
    // Should never happen (sanity check)
    if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) ||
      (baseDn == null))
    {
      return null;
    }
    // Shortcut, if only one server, this is the best
    if (rsInfos.size() == 1)
    {
      for (String repServer : rsInfos.keySet())
        return repServer;
      return rsInfos.values().iterator().next();
    }
    String bestServer = null;
    boolean bestServerIsLocal = false;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
    // Servers late with regard to our changes
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differentiate up to date servers from late ones.
    /**
     * Apply some filtering criteria to determine the best servers list from
     * the available ones. The ordered list of criteria is (from more important
     * to less important):
     * - replication server has the same group id as the local DS one
     * - replication server has the same generation id as the local DS one
     * - replication server is up to date regarding changes generated by the
     *   local DS
     * - replication server in the same VM as local DS one
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
    if (myChangeNumber == null)
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    Map<Integer, ReplicationServerInfo> newBestServers;
    // The list of best replication servers is filtered with each criteria. At
    // each criteria, the list is replaced with the filtered one if some there
    // are some servers from the filtering, otherwise, the list is left as is
    // and the new filtering for the next criteria is applied and so on.
    for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
    {
      myChangeNumber = new ChangeNumber(0, 0, localServerID);
    }
    for (String repServer : rsInfos.keySet())
    {
      ServerState rsState = rsInfos.get(repServer).getServerState();
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
      if (rsChangeNumber == null)
      newBestServers = null;
      switch (filterLevel)
      {
        rsChangeNumber = new ChangeNumber(0, 0, localServerID);
        case 1:
          // Use only servers locally configured: those are servers declared in
          // the local configuration. When the current method is called, for
          // sure, at least one server from the list is locally configured
          bestServers = filterServersLocallyConfigured(bestServers);
          break;
        case 2:
          // Some servers with same group id ?
          newBestServers = filterServersWithSameGroupId(bestServers, groupId);
          if (newBestServers.size() > 0)
          {
            bestServers = newBestServers;
          }
          break;
        case 3:
          // Some servers with same generation id ?
          newBestServers = filterServersWithSameGenerationId(bestServers,
            generationId);
          if (newBestServers.size() > 0)
          {
            // Ok some servers with the right generation id
            bestServers = newBestServers;
            // If some servers with the right generation id this is useful to
            // run the local DS change criteria
            newBestServers = filterServersWithAllLocalDSChanges(bestServers,
              myState, localServerId);
            if (newBestServers.size() > 0)
            {
              bestServers = newBestServers;
            }
          }
          break;
        case 4:
          // Some servers in the local VM ?
          newBestServers = filterServersInSameVM(bestServers);
          if (newBestServers.size() > 0)
          {
            bestServers = newBestServers;
          }
          break;
      }
    }
      // Store state in right list
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
    /**
     * Now apply the choice base on the weight to the best servers list
     */
    if (bestServers.size() > 1)
    {
      if (firstConnection)
      {
        upToDateServers.put(repServer, rsState);
        // We are no connected to a server yet
        return computeBestServerForWeight(bestServers, -1, -1);
      } else
      {
        lateOnes.put(repServer, rsState);
        // We are already connected to a RS: compute the best RS as far as the
        // weights is concerned. If this is another one, some DS must
        // disconnect.
        return computeBestServerForWeight(bestServers, rsServerId,
          localServerId);
      }
    }
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose either :
       * - The local one
       * - The one that has the maximum number of changes to send us.
       *   This is the most up to date one regarding the whole topology.
       *   This server is the one which has the less
       *   difference with the topology server state.
       *   For comparison, we need to compute the difference for each
       *   server id with the topology server state.
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(), baseDn, Integer.toString(localServerID));
      logError(message);
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
       */
      ServerState topoState = new ServerState();
      for (ServerState curState : upToDateServers.values())
      {
        Iterator<Integer> it = curState.iterator();
        while (it.hasNext())
        {
          Integer sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Update topology state
          topoState.update(curSidCn);
        }
      } // For up to date servers
      // Min of the max shifts
      long minShift = -1L;
      for (String upServer : upToDateServers.keySet())
      {
        /*
         * Compute the maximum difference between the time of a server id's
         * change number and the time of the matching server id's change
         * number in the topology server state.
         *
         * Note: we could have used the sequence number here instead of the
         * timestamp, but this would have caused a problem when the sequence
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = 0;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Integer> it = curState.iterator();
        while (it.hasNext())
        {
          Integer sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Cannot be null as checked at construction time
          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          shift +=tmpShift;
        }
        boolean upServerIsLocal =
          ReplicationServer.isLocalReplicationServer(upServer);
        if ((minShift < 0) // First time in loop
            || ((shift < minShift) && upServerIsLocal)
            || (((bestServerIsLocal == false) && (shift < minShift)))
            || ((bestServerIsLocal == false) && (upServerIsLocal &&
                                              (shift<(minShift + 60)) ))
            || (shift+120 < minShift))
        {
          // This server is even closer to topo state
          bestServer = upServer;
          bestServerIsLocal = upServerIsLocal;
          minShift = shift;
        }
      } // For up to date servers
    } else
    {
      /*
       * We could not find a replication server that has seen all the
       * changes that this server has already processed,
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn, lateOnes.size());
      logError(message);
      // Min of the shifts
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, localServerID);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        boolean lateServerisLocal =
          ReplicationServer.isLocalReplicationServer(lateServer);
        if ((minShift < 0) // First time in loop
          || ((tmpShift < minShift) && lateServerisLocal)
          || (((bestServerIsLocal == false) && (tmpShift < minShift)))
          || ((bestServerIsLocal == false) && (lateServerisLocal &&
                                            (tmpShift<(minShift + 60)) ))
          || (tmpShift+120 < minShift))
        {
          // This server is even closer to topo state
          bestServer = lateServer;
          bestServerIsLocal = lateServerisLocal;
          minShift = tmpShift;
        }
      } // For late servers
      return bestServers.values().iterator().next();
    }
    return bestServer;
  }
  /**
   * Creates a new list that contains only replication servers that are locally
   * configured.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers locally configured
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersLocallyConfigured(Map<Integer,
    ReplicationServerInfo> bestServers)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.isLocallyConfigured())
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * passed group id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param groupId The group id that must match
   * @return The sub list of replication servers matching the requested group id
   * (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGroupId(Map<Integer,
    ReplicationServerInfo> bestServers, byte groupId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.getGroupId() == groupId)
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * passed generation id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param generationId The generation id that must match
   * @return The sub list of replication servers matching the requested
   * generation id (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGenerationId(Map<Integer,
    ReplicationServerInfo> bestServers, long generationId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.getGenerationId() == generationId)
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * latest changes from the passed DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param localState The state of the local DS
   * @param localServerId The server id to consider for the changes
   * @return The sub list of replication servers that have the latest changes
   * from the passed DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithAllLocalDSChanges(Map<Integer,
    ReplicationServerInfo> bestServers, ServerState localState,
    int localServerId)
  {
    Map<Integer, ReplicationServerInfo> upToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    // Extract the change number of the latest change generated by the local
    // server
    ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, localServerId);
    }
    /**
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. If some servers more up to date, prefer this list.
     */
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      ServerState rsState = replicationServerInfo.getServerState();
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, localServerId);
      }
      // Has this replication server the latest local change ?
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      {
        if (myChangeNumber.equals(rsChangeNumber))
        {
          // This replication server has exactly the latest change from the
          // local server
          upToDateServers.put(rsId, replicationServerInfo);
        } else
        {
          // This replication server is even more up to date than the local
          // server
          moreUpToDateServers.put(rsId, replicationServerInfo);
        }
      }
    }
    if (moreUpToDateServers.size() > 0)
    {
      // Prefer servers more up to date than local server
      return moreUpToDateServers;
    } else
    {
      return upToDateServers;
    }
  }
  /**
   * Creates a new list that contains only replication servers that are in the
   * same VM as the local DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers being in the same VM as the
   * local DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
    Map<Integer, ReplicationServerInfo> bestServers)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (ReplicationServer.isLocalReplicationServer(
        replicationServerInfo.getServerURL()))
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Computes the best replication server the local server should be connected
   * to so that the load is correctly spread across the topology, following the
   * weights guidance.
   * Warning: This method is expected to be called with at least 2 servers in
   * bestServers
   * Note: this method is static for test purpose (access from unit tests)
   * @param bestServers The list of replication servers to consider
   * @param currentRsServerId The replication server the local server is
   *        currently connected to. -1 if the local server is not yet connected
   *        to any replication server.
   * @param localServerId The server id of the local server. This is not used
   *        when it is not connected to a replication server
   *        (currentRsServerId = -1)
   * @return The replication server the local server should be connected to
   * as far as the weight is concerned. This may be the currently used one if
   * the weight is correctly spread. If the returned value is null, the best
   * replication server is undetermined but the local server must disconnect
   * (so the best replication server is another one than the current one).
   */
  public static ReplicationServerInfo computeBestServerForWeight(
    Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
    int localServerId)
  {
    /*
     * - Compute the load goal of each RS, deducing it from the weights affected
     * to them.
     * - Compute the current load of each RS, deducing it from the DSs
     * currently connected to them.
     * - Compute the differences between the load goals and the current loads of
     * the RSs.
     */
    // Sum of the weights
    int sumOfWeights = 0;
    // Sum of the connected DSs
    int sumOfConnectedDSs = 0;
    for (ReplicationServerInfo replicationServerInfo : bestServers.values())
    {
      sumOfWeights += replicationServerInfo.getWeight();
      sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
    }
    // Distance (difference) of the current loads to the load goals of each RS:
    // key:server id, value: distance
    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
    // Precision for the operations (number of digits after the dot)
    // Default value of rounding method is HALF_UP for
    // the MathContext
    MathContext mathContext = new MathContext(32);
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      int rsWeight = replicationServerInfo.getWeight();
      //  load goal = rs weight / sum of weights
      BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
        new BigDecimal(sumOfWeights), mathContext);
      BigDecimal currentLoadBd = BigDecimal.ZERO;
      if (sumOfConnectedDSs != 0)
      {
        // current load = number of connected DSs / total number of DSs
        int connectedDSs = replicationServerInfo.getConnectedDSNumber();
        currentLoadBd = (new BigDecimal(connectedDSs)).divide(
        new BigDecimal(sumOfConnectedDSs), mathContext);
      }
      // load distance = load goal - current load
      BigDecimal loadDistanceBd =
        loadGoalBd.subtract(currentLoadBd, mathContext);
      loadDistances.put(rsId, loadDistanceBd);
    }
    if (currentRsServerId == -1)
    {
      // The local server is not connected yet
      /*
       * Find the server with the current highest distance to its load goal and
       * choose it. Make an exception if every server is correctly balanced,
       * that is every current load distances are equal to 0, in that case,
       * choose the server with the highest weight
       */
      int bestRsId = 0; // If all server equal, return the first one
      float highestDistance = Float.NEGATIVE_INFINITY;
      boolean allRsWithZeroDistance = true;
      int highestWeightRsId = -1;
      int highestWeight = -1;
      for (Integer rsId : bestServers.keySet())
      {
        float loadDistance = loadDistances.get(rsId).floatValue();
        if (loadDistance > highestDistance)
        {
          // This server is far more from its balance point
          bestRsId = rsId;
          highestDistance = loadDistance;
        }
        if (loadDistance != (float)0)
        {
          allRsWithZeroDistance = false;
        }
        int weight = bestServers.get(rsId).getWeight();
        if (weight > highestWeight)
        {
          // This server has a higher weight
          highestWeightRsId = rsId;
          highestWeight = weight;
        }
      }
      // All servers with a 0 distance ?
      if (allRsWithZeroDistance)
      {
        // Choose server withe the highest weight
        bestRsId = highestWeightRsId;
      }
      return bestServers.get(bestRsId);
    } else
    {
      // The local server is currently connected to a RS, let's see if it must
      // disconnect or not, taking the weights into account.
      float currentLoadDistance =
        loadDistances.get(currentRsServerId).floatValue();
      if (currentLoadDistance < (float) 0)
      {
        // Too much DSs connected to the current RS, compared with its load
        // goal:
        // Determine the potential number of DSs to disconnect from the current
        // RS and see if the local DS is part of them: the DSs that must
        // disconnect are those with the lowest server id.
        // Compute the sum of the distances of the load goals of the other RSs
        BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
        for (Integer rsId : bestServers.keySet())
        {
          if (rsId != currentRsServerId)
          {
            sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
              loadDistances.get(rsId), mathContext);
          }
        }
        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
        {
          // The average distance of the other RSs shows a lack of DSs.
          // Compute the number of DSs to disconnect from the current RS,
          // rounding to the nearest integer number. Do only this if there is
          // no risk of yoyo effect: when the exact balance cannot be
          // established due to the current number of DSs connected, do not
          // disconnect a DS. A simple example where the balance cannot be
          // reached is:
          // - RS1 has weight 1 and 2 DSs
          // - RS2 has weight 1 and 1 DS
          // => disconnecting a DS from RS1 to reconnect it to RS2 would have no
          // sense as this would lead to the reverse situation. In that case,
          // the perfect balance cannot be reached and we must stick to the
          // current situation, otherwise the DS would keep move between the 2
          // RSs
          float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
            multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
            floatValue();
          int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
          // Avoid yoyo effect
          if (overloadingDSsNumber == 1)
          {
            // What would be the new load distance for the current RS if
            // we disconnect some DSs ?
            ReplicationServerInfo currentReplicationServerInfo =
              bestServers.get(currentRsServerId);
            int currentRsWeight = currentReplicationServerInfo.getWeight();
            BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
            BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
            BigDecimal currentRsLoadGoalBd =
              currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
            BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
            if (sumOfConnectedDSs != 0)
            {
              int connectedDSs = currentReplicationServerInfo.
                getConnectedDSNumber();
              BigDecimal potentialNewConnectedDSsBd =
                new BigDecimal(connectedDSs - 1);
              BigDecimal sumOfConnectedDSsBd =
                new BigDecimal(sumOfConnectedDSs);
              potentialCurrentRsNewLoadBd =
                potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
                mathContext);
            }
            BigDecimal potentialCurrentRsNewLoadDistanceBd =
              currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
              mathContext);
            // What would be the new load distance for the other RSs ?
            BigDecimal additionalDsLoadBd =
              (new BigDecimal(1)).divide(
              new BigDecimal(sumOfConnectedDSs), mathContext);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
              sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
              mathContext);
            // Now compare both values: we must no disconnect the DS if this
            // is for going in a situation where the load distance of the other
            // RSs is the opposite of the future load distance of the local RS
            // or we would evaluate that we should disconnect just after being
            // arrived on the new RS. But we should disconnect if we reach the
            // perfect balance (both values are 0).
            MathContext roundMc =
              new MathContext(6, RoundingMode.DOWN);
            BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
              potentialCurrentRsNewLoadDistanceBd.round(roundMc);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
              potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
            if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
              BigDecimal.ZERO) != 0)
              && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
              potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
            {
              // Avoid the yoyo effect, and keep the local DS connected to its
              // current RS
              return bestServers.get(currentRsServerId);
            }
          }
          // Prepare a sorted list (from lowest to highest) or DS server ids
          // connected to the current RS
          ReplicationServerInfo currentRsInfo =
            bestServers.get(currentRsServerId);
          List<Integer> serversConnectedToCurrentRS =
            currentRsInfo.getConnectedDSs();
          List<Integer> sortedServers = new ArrayList<Integer>(
            serversConnectedToCurrentRS);
          Collections.sort(sortedServers);
          // Go through the list of DSs to disconnect and see if the local
          // server is part of them.
          int index = 0;
          while (overloadingDSsNumber > 0)
          {
            int severToDisconnectId = sortedServers.get(index);
            if (severToDisconnectId == localServerId)
            {
              // The local server is part of the DSs to disconnect
              return null;
            }
            overloadingDSsNumber--;
            index++;
          }
          // The local server is not part of the servers to disconnect from the
          // current RS.
          return bestServers.get(currentRsServerId);
        } else
        {
          // The average distance of the other RSs does not show a lack of DSs:
          // no need to disconnect any DS from the current RS.
          return bestServers.get(currentRsServerId);
        }
      } else
      {
        // The RS load goal is reached or there are not enough DSs connected to
        // it to reach it: do not disconnect from this RS and return rsInfo for
        // this RS
        return bestServers.get(currentRsServerId);
      }
    }
  }
  /**
@@ -1679,28 +2191,6 @@
  }
  /**
   * Starts the same group id poller.
   */
  private void startSameGroupIdPoller()
  {
    sameGroupIdPoller = new SameGroupIdPoller();
    sameGroupIdPoller.start();
  }
  /**
   * Stops the same group id poller.
   */
  private synchronized void stopSameGroupIdPoller()
  {
    if (sameGroupIdPoller != null)
    {
      sameGroupIdPoller.shutdown();
      sameGroupIdPoller.waitForShutdown();
      sameGroupIdPoller = null;
    }
  }
  /**
   * Stop the heartbeat monitor thread.
   */
  synchronized void stopRSHeartBeatMonitoring()
@@ -1926,7 +2416,8 @@
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   * before being called. This is a wrapper to the method with a boolean version
   * so that we do not have to modify existing tests.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
@@ -1934,6 +2425,26 @@
   */
  public ReplicationMsg receive() throws SocketTimeoutException
  {
    return receive(false);
  }
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   * @param allowReconnectionMechanism If true, this allows the reconnection
   * mechanism to disconnect the broker if it detects that it should reconnect
   * to another replication server because of some criteria defined by the
   * algorithm where we choose a suitable replication server.
   */
  public ReplicationMsg receive(boolean allowReconnectionMechanism)
    throws SocketTimeoutException
  {
    while (shutdown == false)
    {
      if (!connected)
@@ -1956,13 +2467,16 @@
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else if (msg instanceof TopologyMsg)
        } else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg)msg;
          TopologyMsg topoMsg = (TopologyMsg) msg;
          receiveTopo(topoMsg);
        }
        else if (msg instanceof StopMsg)
          if (allowReconnectionMechanism)
          {
            // Reset wait time before next computation of best server
            mustRunBestServerCheckingAlgorithm = 0;
          }
        } else if (msg instanceof StopMsg)
        {
          /*
           * RS performs a proper disconnection
@@ -1974,8 +2488,7 @@
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else if (msg instanceof MonitorMsg)
        } else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
@@ -1997,16 +2510,53 @@
            monitorResponse.notify();
          }
          // Extract and store replication servers ServerStates
          rsStates = new HashMap<Integer, ServerState>();
          // Update the replication servers ServerStates with new received info
          it = monitorMsg.rsIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
            if (rsInfo != null)
            {
              rsInfo.update(monitorMsg.getRSServerState(srvId));
            }
          }
        }
        else
          // Now if it is allowed, compute the best replication server to see if
          // it is still the one we are currently connected to. If not,
          // disconnect properly and let the connection algorithm re-connect to
          // best replication server
          if (allowReconnectionMechanism)
          {
            mustRunBestServerCheckingAlgorithm++;
            if (mustRunBestServerCheckingAlgorithm == 2)
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              ReplicationServerInfo bestServerInfo =
                computeBestReplicationServer(false, rsServerId, state,
                replicationServerInfos, serverId, baseDn, groupId,
                generationID);
              if ((bestServerInfo == null) ||
                (bestServerInfo.getServerId() != rsServerId))
              {
                // The best replication server is no more the one we are
                // currently using. Disconnect properly then reconnect.
                Message message =
                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
                  Integer.toString(serverId),
                  Integer.toString(rsServerId),
                  rsServerUrl);
                logError(message);
                reStart();
              }
              // Reset wait time before next computation of best server
              mustRunBestServerCheckingAlgorithm = 0;
            }
          }
        } else
        {
          return msg;
        }
@@ -2018,15 +2568,14 @@
        if (shutdown == false)
        {
          if ((session == null) || (!session.closeInitiated()))
          {
            /*
             * We did not initiate the close on our side, log an error message.
             */
            Message message =
              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
                  Integer.toString(rsServerId), baseDn.toString(),
                  Integer.toString(serverId));
              Integer.toString(rsServerId), baseDn.toString(),
              Integer.toString(serverId));
            logError(message);
          }
          this.reStart(failingSession);
@@ -2066,7 +2615,8 @@
        }
      }
    } catch (InterruptedException e)
    {}
    {
    }
    return replicaStates;
  }
@@ -2081,7 +2631,7 @@
  {
    try
    {
      updateDoneCount ++;
      updateDoneCount++;
      if ((updateDoneCount >= halfRcvWindow) && (session != null))
      {
        session.publish(new WindowMsg(updateDoneCount));
@@ -2103,10 +2653,9 @@
    if (debugEnabled())
    {
      debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
        " close the connection to replication server " + rsServerId + " for"
        + " domain " + baseDn);
        " close the connection to replication server " + rsServerId + " for" +
        " domain " + baseDn);
    }
    stopSameGroupIdPoller();
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    replicationServer = "stopped";
@@ -2237,8 +2786,8 @@
   * @param groupId            The new group id to use
   */
  public boolean changeConfig(
      Collection<String> replicationServers, int window, long heartbeatInterval,
      byte groupId)
    Collection<String> replicationServers, int window, long heartbeatInterval,
    byte groupId)
  {
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
@@ -2248,11 +2797,11 @@
    // A new session is necessary only when information regarding
    // the connection is modified
    if ((servers == null) ||
        (!(replicationServers.size() == servers.size()
        && replicationServers.containsAll(servers))) ||
        window != this.maxRcvWindow  ||
        heartbeatInterval != this.heartbeatInterval ||
        (groupId != this.groupId))
      (!(replicationServers.size() == servers.size() && replicationServers.
      containsAll(servers))) ||
      window != this.maxRcvWindow ||
      heartbeatInterval != this.heartbeatInterval ||
      (groupId != this.groupId))
    {
      needToRestartSession = true;
    }
@@ -2313,152 +2862,6 @@
  }
  /**
   * In case we are connected to a RS with a different group id, we use this
   * thread to poll presence of a RS with the same group id as ours. If a RS
   * with the same group id is available, we close the session to force
   * reconnection. Reconnection will choose a server with the same group id.
   */
  private class SameGroupIdPoller extends DirectoryThread
  {
    private boolean sameGroupIdPollershutdown = false;
    private boolean terminated = false;
    // Sleep interval in ms
    private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000;
    public SameGroupIdPoller()
    {
      super("Replication Broker Same Group Id Poller for " + baseDn.toString() +
        " and group id " + groupId + " in server id " + serverId);
    }
    /**
     * Wait for the completion of the same group id poller.
     */
    public void waitForShutdown()
    {
      try
      {
        while (terminated == false)
        {
          Thread.sleep(50);
        }
      } catch (InterruptedException e)
      {
        // exit the loop if this thread is interrupted.
      }
    }
    /**
     * Shutdown the same group id poller.
     */
    public void shutdown()
    {
      sameGroupIdPollershutdown = true;
    }
    /**
     * Permanently look for RS with our group id and if found, break current
     * connection to force reconnection to a new server with the right group id.
     */
    @Override
    public void run()
    {
      boolean done = false;
      if (debugEnabled())
      {
        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
          " started.");
      }
      while ((!done) && (!sameGroupIdPollershutdown))
      {
        // Sleep some time between checks
        try
        {
          Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD);
        } catch (InterruptedException e)
        {
          // Stop as we are interrupted
          sameGroupIdPollershutdown = true;
        }
        synchronized (connectPhaseLock)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("Running SameGroupIdPoller for: " +
              baseDn.toString());
          }
          if (session != null) // Check only if not already disconnected
          {
            for (String server : servers)
            {
              // Do not ask the RS we are connected to as it has for sure the
              // wrong group id
              if (server.equals(rsServerUrl))
                continue;
              // Connect to server and get reply message
              ServerInfo serverInfo =
                performPhaseOneHandshake(server, false);
              // Is it a server with our group id ?
              if (serverInfo != null)
              {
                if (groupId == serverInfo.getGroupId())
                {
                  // Found one server with the same group id as us, disconnect
                  // session to force reconnection to a server with same group
                  // id.
                  Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
                    Byte.toString(groupId), baseDn.toString(),
                    Integer.toString(serverId));
                  logError(message);
                  if (protocolVersion >=
                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
                  {
                    // V4 protocol introduces a StopMsg to properly end
                    // communications
                    try
                    {
                      session.publish(new StopMsg());
                    } catch (IOException ioe)
                    {
                      // Anyway, going to close session, so nothing to do
                    }
                  }
                  try
                  {
                    session.close();
                  } catch (Exception e)
                  {
                    // The session was already closed, just ignore.
                  }
                  session = null;
                  done = true; // Terminates thread as did its job.
                  break;
                }
              }
            } // for server
          }
        }
      }
      terminated = true;
      if (debugEnabled())
      {
        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
          " terminated.");
      }
    }
  }
  /**
   * Signals the RS we just entered a new status.
   * @param newStatus The status the local DS just entered
   */
@@ -2505,7 +2908,42 @@
   */
  public List<RSInfo> getRsList()
  {
    return rsList;
    List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo replicationServerInfo :
      replicationServerInfos.values())
    {
      result.add(replicationServerInfo.toRSInfo());
    }
    return result;
  }
  /**
   * Computes the list of DSs connected to a particular RS.
   * @param rsId The RS id of the server one wants to know the connected DSs
   * @param dsList The list of DSinfo from which to compute things
   * @return The list of connected DSs to the server rsId
   */
  private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
  {
    List<Integer> connectedDSs = new ArrayList<Integer>();
    if (rsServerId == rsId)
    {
      // If we are computing connected DSs for the RS we are connected
      // to, we should count the local DS as the DSInfo of the local DS is not
      // sent by the replication server in the topology message. We must count
      // ourself as a connected server.
      connectedDSs.add(serverId);
    }
    for (DSInfo dsInfo : dsList)
    {
      if (dsInfo.getRsId() == rsId)
        connectedDSs.add(dsInfo.getDsId());
    }
    return connectedDSs;
  }
  /**
@@ -2516,13 +2954,49 @@
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    // Store new lists
    synchronized(getDsList())
    // Store new DS list
    dsList = topoMsg.getDsList();
    // Update replication server info list with the received topology
    // information
    List<Integer> rsToKeepList = new ArrayList<Integer>();
    for (RSInfo rsInfo : topoMsg.getRsList())
    {
      synchronized(getRsList())
      int rsId = rsInfo.getId();
      rsToKeepList.add(rsId); // Mark this server as still existing
      List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
      ReplicationServerInfo replicationServerInfo =
        replicationServerInfos.get(rsId);
      if (replicationServerInfo == null)
      {
        dsList = topoMsg.getDsList();
        rsList = topoMsg.getRsList();
        // New replication server, create info for it add it to the list
        replicationServerInfo =
          new ReplicationServerInfo(rsInfo, connectedDSs);
        // Set the locally configured flag for this new RS only if it is
        // configured
        updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
        replicationServerInfos.put(rsId, replicationServerInfo);
      } else
      {
        // Update the existing info for the replication server
        replicationServerInfo.update(rsInfo, connectedDSs);
      }
    }
    /**
     * Now remove any replication server that may have disappeared from the
     * topology.
     */
    Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
      replicationServerInfos.entrySet().iterator();
    while (rsInfoIt.hasNext())
    {
      Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
      if (!rsToKeepList.contains(rsInfoEntry.getKey()))
      {
        // This replication server has quit the topology, remove it from the
        // list
        rsInfoIt.remove();
      }
    }
    if (domain != null)
@@ -2536,6 +3010,7 @@
      }
    }
  }
  /**
   * Check if the broker could not find any Replication Server and therefore
   * connection attempt failed.
@@ -2557,16 +3032,15 @@
    {
      ctHeartbeatPublisherThread =
        new CTHeartbeatPublisherThread(
            "Replication CN Heartbeat sender for " +
            baseDn + " with " + getReplicationServer(),
            session, changeTimeHeartbeatSendInterval, serverId);
        "Replication CN Heartbeat sender for " +
        baseDn + " with " + getReplicationServer(),
        session, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
    }
    else
    } else
    {
      if (debugEnabled())
        TRACER.debugInfo(this +
          " is not configured to send CN heartbeat interval");
        " is not configured to send CN heartbeat interval");
    }
  }