mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
19.53.2010 874f6e3a092bdaa5f151c512c9284b15f5886e82
This is about refactoring the way the directory server chooses the 
replication server it will connect to. This also introduces a new
(weighed) load balancing feature that spreads DS connections across the
RSs, according to the RS weights defined by the administrator,

Issue 4343: https://opends.dev.java.net/issues/show_bug.cgi?id=4343

The commit includes necessary modifications for implementing what is
described in this document:

https://www.opends.org/wiki/page/ReplicationServerSelection

There is a little "implementation" section in this document that
explains some important stuff.

Also good to know for these code modifications:

- The topology info related to RSs and kept by the ReplicationBroker is
now fully kept in a new map of a new ReplicationServerInfo bag class.
This map is updated upon reception of a TopologyMsg.
- Protocol change: the TopologyMsg now includes the RS url in the RSInfo
list
- The dynamic change of the weight of a RS triggers a new TopologyMsg
being fired, to support dynamic change of weights and automatic topology
re-connections
- SameGroupIdPoller thread has disappeared and its functionality is
replaced by the mechanism that re-evaluates the more suitable RS (see
section 5, in the document for more details)
1 files added
20 files modified
4502 ■■■■ changed files
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml 14 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/DSInfo.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/RSInfo.java 40 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 31 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 22 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 1480 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 1494 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java 1318 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java 20 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java 8 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 10 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 4 ●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2009 Sun Microsystems, Inc.
  !      Copyright 2007-2010 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="replication-server"
  plural-name="replication-servers"
@@ -219,8 +219,8 @@
      The timeout value when waiting for assured mode acknowledgments.
    </adm:synopsis>
    <adm:description>
      Defines the amount of milliseconds the replication server will wait for
      assured acknowledgments (in either Safe Data or Safe Read assured sub
      Defines the number of milliseconds that the replication server will wait
      for assured acknowledgments (in either Safe Data or Safe Read assured sub
      modes) before forgetting them and answer to the entity that sent an update
      and is waiting for acknowledgment.
    </adm:description>
@@ -288,7 +288,7 @@
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="0"></adm:integer>
      <adm:integer lower-limit="1"></adm:integer>
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
@@ -301,9 +301,9 @@
      The period between sending of monitoring messages.
    </adm:synopsis>
    <adm:description>
      Defines the amount of milliseconds the replication server will wait before
      sending new monitoring messages to its peers (replication servers and
      directory servers).
      Defines the number of milliseconds that the replication server will wait
      before sending new monitoring messages to its peers (replication servers
      and directory servers).
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
opends/src/messages/messages/replication.properties
@@ -20,7 +20,7 @@
#
# CDDL HEADER END
#
#      Copyright 2006-2008 Sun Microsystems, Inc.
#      Copyright 2006-2010 Sun Microsystems, Inc.
#
#
# This file contains the primary Directory Server configuration.  It must not
@@ -448,3 +448,6 @@
 required. Reason: The provided cookie is older than the start of historical \
 in the server for the replicated domain : %s
SEVERE_ERR_INVALID_COOKIE_SYNTAX_187=Invalid syntax of the provided cookie
NOTICE_NEW_BEST_REPLICATION_SERVER_188=Domain %s (server id: %s) : \
 disconnecting from this replication server (server id: %s, url: %s) : as a \
 new one is more suitable
opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -245,7 +245,7 @@
  public String toString()
  {
    StringBuffer sb = new StringBuffer();
    sb.append("DS id: ");
    sb.append("\nDS id: ");
    sb.append(dsId);
    sb.append(" ; RS id: ");
    sb.append(rsId);
opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -42,21 +42,26 @@
  private byte groupId = (byte) -1;
  // The weight of the RS
  // It is important to keep the default value to 1 so that it is used as
  // default value for a RS using protocol V3: this default value vill be used
  // default value for a RS using protocol V3: this default value will be used
  // in algorithms that use weight
  private int weight = 1;
  // The server URL of the RS
  private String serverUrl = null;
  /**
   * Creates a new instance of RSInfo with every given info.
   *
   * @param id The RS id
   * @param serverUrl Url of the RS
   * @param generationId The generation id the RS is using
   * @param groupId RS group id
   * @param weight RS weight
   */
  public RSInfo(int id, long generationId, byte groupId, int weight)
  public RSInfo(int id, String serverUrl,
    long generationId, byte groupId, int weight)
  {
    this.id = id;
    this.serverUrl = serverUrl;
    this.generationId = generationId;
    this.groupId = groupId;
    this.weight = weight;
@@ -117,7 +122,10 @@
      return ((id == rsInfo.getId()) &&
        (generationId == rsInfo.getGenerationId()) &&
        (groupId == rsInfo.getGroupId()) &&
        (weight == rsInfo.getWeight()));
        (weight == rsInfo.getWeight()) &&
        (((serverUrl == null) && (rsInfo.getServerUrl() == null)) ||
        ((serverUrl != null) && (rsInfo.getServerUrl() != null) &&
        (serverUrl.equals(rsInfo.getServerUrl())))));
    } else
    {
      return false;
@@ -131,15 +139,25 @@
  @Override
  public int hashCode()
  {
    int hash = 5;
    hash = 37 * hash + this.id;
    hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
    hash = 37 * hash + this.groupId;
    hash = 37 * hash + this.weight;
    int hash = 7;
    hash = 17 * hash + this.id;
    hash = 17 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
    hash = 17 * hash + this.groupId;
    hash = 17 * hash + this.weight;
    hash = 17 * hash + (this.serverUrl != null ? this.serverUrl.hashCode() : 0);
    return hash;
  }
  /**
   * Gets the server URL.
   * @return the serverUrl
   */
  public String getServerUrl()
  {
    return serverUrl;
  }
  /**
   * Returns a string representation of the DS info.
   * @return A string representation of the DS info
   */
@@ -147,8 +165,10 @@
  public String toString()
  {
    StringBuffer sb = new StringBuffer();
    sb.append("Id: ");
    sb.append("\nId: ");
    sb.append(id);
    sb.append(" ; Server URL: ");
    sb.append(serverUrl);
    sb.append(" ; Generation id: ");
    sb.append(generationId);
    sb.append(" ; Group id: ");
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -126,7 +126,7 @@
   * of a server state.
   *
   * @param in the byte array where to calculate the string.
   * @param pos the position whre to start from in the byte array.
   * @param pos the position where to start from in the byte array.
   * @return the length of the next string.
   * @throws DataFormatException If the byte array does not end with null.
   */
@@ -174,6 +174,33 @@
  }
  /**
   * Update the Server State with a Server State. Every change number of this
   * object is updated with the change number of the passed server state if
   * it is newer.
   *
   * @param serverState the server state to use for the update.
   *
   * @return a boolean indicating if the update was meaningful.
   */
  public boolean update(ServerState serverState)
  {
    if (serverState == null)
      return false;
    boolean updated = false;
    for (ChangeNumber cn : serverState.list.values())
    {
      if (update(cn))
      {
        updated = true;
      }
    }
    return updated;
  }
  /**
   * Replace the Server State with another ServerState.
   *
   * @param serverState The ServerState.
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -59,6 +59,7 @@
   *   ECL entry attributes.
   * - Modified algorithm for choosing a RS to connect to: introduction of a
   *   ReplicationServerDSMsg message.
   *   -> also added of the server URL in RSInfo of TopologyMsg
   * - Introduction of a StopMsg for proper connections ending.
   */
  public static final short REPLICATION_PROTOCOL_V4 = 4;
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -80,7 +80,7 @@
   * This specifies that the request on the ECL is a PERSISTENT search
   * with changesOnly = false.
   */
  public final static short PERSISTENT_CHANGES_ONLY = 2;;
  public final static short PERSISTENT_CHANGES_ONLY = 2;
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Copyright 2007-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -198,6 +198,10 @@
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put server URL
          oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
          oStream.write(0);
          // Put RS weight
          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
          oStream.write(0);
@@ -242,8 +246,7 @@
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int dsId = Integer.valueOf(serverIdString);
        pos +=
          length + 1;
        pos += length + 1;
        /* Read RS id */
        length =
@@ -251,16 +254,14 @@
        serverIdString =
          new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos +=
          length + 1;
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos +=
          length + 1;
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
@@ -296,8 +297,7 @@
          length = getNextLength(in, pos);
          String url = new String(in, pos, length, "UTF-8");
          refUrls.add(url);
          pos +=
            length + 1;
          pos += length + 1;
          nRead++;
        }
@@ -314,8 +314,7 @@
            length = getNextLength(in, pos);
            String attr = new String(in, pos, length, "UTF-8");
            attrs.add(attr);
            pos +=
              length + 1;
            pos += length + 1;
            nRead++;
          }
        }
@@ -353,8 +352,13 @@
        byte groupId = in[pos++];
        int weight = 1;
        String serverUrl = null;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          length = getNextLength(in, pos);
          serverUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
@@ -363,7 +367,8 @@
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight);
        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
          weight);
        rsList.add(rsInfo);
        nRsInfo--;
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -265,6 +265,7 @@
      throw new ConfigException(msg, e);
    }
    groupId = (byte)configuration.getGroupId();
    weight = configuration.getWeight();
    assuredTimeout = configuration.getAssuredTimeout();
    degradedStatusThreshold = configuration.getDegradedStatusThreshold();
    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -107,7 +107,7 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
  private final String baseDn;
  // The Status analyzer that periodically verifis if the connected DSs are
  // The Status analyzer that periodically verifies if the connected DSs are
  // late or not
  private StatusAnalyzer statusAnalyzer = null;
@@ -744,7 +744,7 @@
        // Change the number of expected acks if not enough available eligible
        // servers: the level is a best effort thing, we do not want to timeout
        // at every assured SD update for instance if a RS has had his gen id
        // resetted
        // reseted
        byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
          (byte)sdl : // Keep level as it was
          (byte)(nExpectedServers+1)); // Change level to match what's available
@@ -823,7 +823,7 @@
      }
    } else
    {
      // The timeout occured for the update matching this change number and the
      // The timeout occurred for the update matching this change number and the
      // ack with timeout error has probably already been sent.
    }
  }
@@ -2026,8 +2026,8 @@
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
      replicationServer.getServerURL(), generationId,
      replicationServer.getGroupId(), replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    return new TopologyMsg(dsInfos, rsInfos);
@@ -2040,7 +2040,7 @@
   * Also put info related to local RS.
   *
   * @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and
   * that we must not include in the list DS list.
   * that we must not include in the DS list.
   * @return A suitable TopologyMsg PDU to be sent to a peer DS
   */
  public TopologyMsg createTopologyMsgForDS(int destDsId)
@@ -2058,8 +2058,8 @@
    // Add our own info (local RS)
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
      replicationServer.getServerURL(), generationId,
      replicationServer.getGroupId(), replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    // Go through every peer RSs (and get their connected DSs), also add info
@@ -2502,7 +2502,7 @@
       */
      if (allowResetGenId)
      {
        // Check if generation id has to be resetted
        // Check if generation id has to be reseted
        mayResetGenerationId();
        if (generationId < 0)
          generationId = handler.getGenerationId();
@@ -3214,7 +3214,7 @@
  /**
   * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
   * state.
   * For each DS, take the oldest CN from the changetime hearbeat state
   * For each DS, take the oldest CN from the changetime heartbeat state
   * and from the changelog db last CN. Can be null.
   * @return the eligible CN.
   */
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -1219,7 +1219,8 @@
   */
  public RSInfo toRSInfo()
  {
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight);
    RSInfo rsInfo = new RSInfo(serverId, serverURL, generationId, groupId,
      weight);
    return rsInfo;
  }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,12 +26,16 @@
 */
package org.opends.server.replication.service;
import java.net.UnknownHostException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -40,10 +44,12 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -51,7 +57,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
@@ -117,13 +122,11 @@
  private String rsServerUrl = null;
  // Our replication domain
  private ReplicationDomain domain = null;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled or the last
@@ -131,15 +134,6 @@
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  /**
   * A Map containing the ServerStates of all the replication servers in the
   * topology as seen by the ReplicationServer the last time it was polled or
   * the last time it published monitoring information.
   */
  private HashMap<Integer, ServerState> rsStates =
    new HashMap<Integer, ServerState>();
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
@@ -163,10 +157,6 @@
   */
  private boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  // Same group id poller thread
  private SameGroupIdPoller sameGroupIdPoller = null;
  /**
   * The thread that publishes messages to the RS containing the current
   * change time of this DS.
@@ -183,12 +173,30 @@
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  // Info for other RSs.
  private List<RSInfo> rsList = new ArrayList<RSInfo>();
  private long generationID;
  private int updateDoneCount = 0;
  private boolean connectRequiresRecovery = false;
  /**
   * The map of replication server info initialized at connection time and
   * regularly updated. This is used to decide to which best suitable
   * replication server one wants to connect.
   * Key: replication server id
   * Value: replication server info for the matching replication server id
   */
  private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
  /**
   * This integer defines when the best replication server checking algorithm
   * should be engaged.
   * Every time a monitoring message (each monitoring publisher period) is
   * received, it is incremented. When it reaches 2, we run the checking
   * algorithm to see if we must reconnect to another best replication server.
   * Then we reset the value to 0. But when a topology message is received, the
   * integer is reseted to 0. This insures that we wait at least one monitoring
   * publisher period before running the algorithm, but also that we wait at
   * least for a monitoring period after the last received topology message
   * (topology stabilization).
   */
  private int mustRunBestServerCheckingAlgorithm = 0;
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -300,8 +308,10 @@
  private long getGenerationID()
  {
    if (domain != null)
      return domain.getGenerationID();
    else
    {
      // Update the generation id
      generationID = domain.getGenerationID();
    }
      return generationID;
  }
@@ -315,48 +325,167 @@
  }
  /**
   * Bag class for keeping info we get from a server in order to compute the
   * best one to connect to. This is in fact a wrapper to a
   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
   * Sets the locally configured flag for the passed ReplicationServerInfo
   * object, analyzing the local configuration.
   * @param
   */
  public static class ServerInfo
  private void updateRSInfoLocallyConfiguredStatus(
    ReplicationServerInfo replicationServerInfo)
  {
    // Determine if the passed ReplicationServerInfo has a URL that is present
    // in the locally configured replication servers
    String rsUrl = replicationServerInfo.getServerURL();
    if (rsUrl == null)
    {
      // The ReplicationServerInfo has been generated from a server with
      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
      // ignore this server as we do not know how to connect to it
      replicationServerInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : servers)
    {
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
        // This RS is locally configured, mark this
        replicationServerInfo.setLocallyConfigured(true);
        return;
      }
    }
    replicationServerInfo.setLocallyConfigured(false);
  }
  /**
   * Compares 2 replication servers addresses and returns true if they both
   * represent the same replication server instance.
   * @param rs1Url Replication server 1 address
   * @param rs2Url Replication server 2 address
   * @return True if both replication server addresses represent the same
   * replication server instance, false otherwise.
   */
  private static boolean isSameReplicationServerUrl(String rs1Url,
    String rs2Url)
  {
    // Get and compare ports of RS1 and RS2
    int separator1 = rs1Url.lastIndexOf(':');
    if (separator1 < 0)
    {
      // Not a RS url: should not happen
      return false;
    }
    int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1));
    int separator2 = rs2Url.lastIndexOf(':');
    if (separator2 < 0)
    {
      // Not a RS url: should not happen
      return false;
    }
    int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1));
    if (rs1Port != rs2Port)
    {
      return false;
    }
    // Get and compare addresses of RS1 and RS2
    String rs1 = rs1Url.substring(0, separator1);
    InetAddress[] rs1Addresses = null;
    try
    {
      if (rs1.equals("localhost") || rs1.equals("127.0.0.1"))
      {
        // Replace localhost with the local official hostname
        rs1 = InetAddress.getLocalHost().getHostName();
      }
      rs1Addresses = InetAddress.getAllByName(rs1);
    } catch (UnknownHostException ex)
    {
      // Unknown RS: should not happen
      return false;
    }
    String rs2 = rs2Url.substring(0, separator2);
    InetAddress[] rs2Addresses = null;
    try
    {
      if (rs2.equals("localhost") || rs2.equals("127.0.0.1"))
      {
        // Replace localhost with the local official hostname
        rs2 = InetAddress.getLocalHost().getHostName();
      }
      rs2Addresses = InetAddress.getAllByName(rs2);
    } catch (UnknownHostException ex)
    {
      // Unknown RS: should not happen
      return false;
    }
    // Now compare addresses, if at least one match, this is the same server
    for (int i = 0; i < rs1Addresses.length; i++)
    {
      InetAddress inetAddress1 = rs1Addresses[i];
      for (int j = 0; j < rs2Addresses.length; j++)
      {
        InetAddress inetAddress2 = rs2Addresses[j];
        if (inetAddress2.equals(inetAddress1))
        {
          return true;
        }
      }
    }
    return false;
  }
  /**
   * Bag class for keeping info we get from a replication server in order to
   * compute the best one to connect to. This is in fact a wrapper to a
   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
   * updated with a info coming from received topology messages or monitoring
   * messages.
   */
  public static class ReplicationServerInfo
  {
    private short protocolVersion;
    private long generationId;
    private byte groupId = (byte) -1;
    private int serverId;
    // Received server URL
    private String serverURL;
    private String baseDn = null;
    private int windowSize;
    private ServerState serverState;
    private ServerState serverState = null;
    private boolean sslEncryption;
    private int degradedStatusThreshold = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int weight = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int connectedDSNumber = -1;
    // Keeps the 1 value if created with a ReplServerStartMsg
    private int weight = 1;
    // Keeps the 0 value if created with a ReplServerStartMsg
    private int connectedDSNumber = 0;
    private List<Integer> connectedDSs = null;
    // Is this RS locally configured ? (the RS is recognized as a usable server)
    private boolean locallyConfigured = true;
    /**
     * Create a new instance of ServerInfo wrapping the passed message.
     * Create a new instance of ReplicationServerInfo wrapping the passed
     * message.
     * @param msg Message to wrap.
     * @return The new instance wrapping the passed message.
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public static ServerInfo newServerInfo(
    public static ReplicationServerInfo newInstance(
      ReplicationMsg msg) throws IllegalArgumentException
    {
      if (msg instanceof ReplServerStartMsg)
      {
        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
        return new ServerInfo(replServerStartMsg);
      }
      else if (msg instanceof ReplServerStartDSMsg)
        return new ReplicationServerInfo(replServerStartMsg);
      } else if (msg instanceof ReplServerStartDSMsg)
      {
        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
        return new ServerInfo(replServerStartDSMsg);
        return new ReplicationServerInfo(replServerStartDSMsg);
      }
      // Unsupported message type: should not happen
@@ -365,10 +494,10 @@
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
     * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
     */
    private ServerInfo(ReplServerStartMsg replServerStartMsg)
    private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
    {
      this.protocolVersion = replServerStartMsg.getVersion();
      this.generationId = replServerStartMsg.getGenerationId();
@@ -384,11 +513,12 @@
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
     * Constructs a ReplicationServerInfo object wrapping a
     * ReplServerStartDSMsg.
     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
     * wrap.
     */
    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    {
      this.protocolVersion = replServerStartDSMsg.getVersion();
      this.generationId = replServerStartDSMsg.getGenerationId();
@@ -514,6 +644,89 @@
    {
      return connectedDSNumber;
    }
    /**
     * Constructs a new replication server info with the passed RSInfo
     * internal values and the passed connected DSs.
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
    {
      this.serverId = rsInfo.getId();
      this.serverURL = rsInfo.getServerUrl();
      this.generationId = rsInfo.getGenerationId();
      this.groupId = rsInfo.getGroupId();
      this.weight = rsInfo.getWeight();
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
    }
    /**
     * Converts the object to a RSInfo object.
     * @return The RSInfo object matching this object.
     */
    public RSInfo toRSInfo()
    {
      return new RSInfo(serverId, serverURL, generationId, groupId, weight);
    }
    /**
     * Updates replication server info with the passed RSInfo internal values
     * and the passed connected DSs.
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public void update(RSInfo rsInfo, List<Integer> connectedDSs)
    {
      this.generationId = rsInfo.getGenerationId();
      this.groupId = rsInfo.getGroupId();
      this.weight = rsInfo.getWeight();
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
    }
    /**
     * Updates replication server info with the passed server state.
     * @param serverState The ServerState to use for the update
     */
    public void update(ServerState serverState)
    {
      if (this.serverState != null)
      {
        this.serverState.update(serverState);
      } else
      {
        this.serverState = serverState;
      }
    }
    /**
     * Get the getConnectedDSs.
     * @return the getConnectedDSs
     */
    public List<Integer> getConnectedDSs()
    {
      return connectedDSs;
    }
    /**
     * Gets the locally configured status for this RS.
     * @return the locallyConfigured
     */
    public boolean isLocallyConfigured()
    {
      return locallyConfigured;
    }
    /**
     * Sets the locally configured status for this RS.
     * @param locallyConfigured the locallyConfigured to set
     */
    public void setLocallyConfigured(boolean locallyConfigured)
    {
      this.locallyConfigured = locallyConfigured;
    }
  }
  private void connect()
@@ -522,8 +735,7 @@
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
    {
      connectAsECL();
    }
    else
    } else
    {
      connectAsDataServer();
    }
@@ -534,19 +746,22 @@
   * able to choose the more suitable.
   * @return the collected information.
   */
  private Map<String, ServerInfo> collectReplicationServersInfo() {
  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
  {
    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    Map<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    for (String server : servers)
    {
      // Connect to server and get info about it
      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
      ReplicationServerInfo replicationServerInfo =
        performPhaseOneHandshake(server, false);
      // Store server info in list
      if (serverInfo != null)
      if (replicationServerInfo != null)
      {
        rsInfos.put(server, serverInfo);
        rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
      }
    }
@@ -558,7 +773,6 @@
   * are :
   * - 1 single RS configured
   * - so no choice of the preferred RS
   * - No same groupID polling
   * - ?? Heartbeat
   * - Start handshake is :
   *    Broker ---> StartECLMsg       ---> RS
@@ -570,8 +784,8 @@
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    ReplServerStartDSMsg inReplServerStartDSMsg
      = performECLPhaseOneHandshake(bestServer, true);
    ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
      bestServer, true);
    if (inReplServerStartDSMsg!=null)
      performECLPhaseTwoHandshake(bestServer);
@@ -589,7 +803,7 @@
   *
   * phase 1:
   * DS --- ServerStartMsg ---> RS
   * DS <--- ReplServerStartMsg --- RS
   * DS <--- ReplServerStartDSMsg --- RS
   * phase 2:
   * DS --- StartSessionMsg ---> RS
   * DS <--- TopologyMsg --- RS
@@ -615,9 +829,9 @@
    }
    // Stop any existing poller and heartbeat monitor from a previous session.
    stopSameGroupIdPoller();
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    mustRunBestServerCheckingAlgorithm = 0;
    boolean newServerWithSameGroupId = false;
    synchronized (connectPhaseLock)
@@ -631,38 +845,44 @@
            " order to elect the preferred one");
      // Get info from every available replication servers
      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
      replicationServerInfos = collectReplicationServersInfo();
      ServerInfo serverInfo = null;
      ReplicationServerInfo replicationServerInfo = null;
      if (rsInfos.size() > 0)
      if (replicationServerInfos.size() > 0)
      {
        // At least one server answered, find the best one.
        String bestServer = computeBestReplicationServer(state, rsInfos,
          serverId, baseDn, groupId);
        replicationServerInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, baseDn, groupId,
          this.getGenerationID());
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
              "phase 2 : will perform PhaseOneH with the preferred RS.");
        serverInfo = performPhaseOneHandshake(bestServer, true);
        replicationServerInfo = performPhaseOneHandshake(
          replicationServerInfo.getServerURL(), true);
        // Update replication server info with potentially more up to date data
        // (server state for instance may have changed)
        replicationServerInfos.put(replicationServerInfo.getServerId(),
          replicationServerInfo);
        if (serverInfo != null) // Handshake phase 1 exchange went well
        if (replicationServerInfo != null)
        {
          // Handshake phase 1 exchange went well
          // Compute in which status we are starting the session to tell the RS
          ServerStatus initStatus =
            computeInitialServerStatus(serverInfo.getGenerationId(),
            serverInfo.getServerState(),
            serverInfo.getDegradedStatusThreshold(),
            computeInitialServerStatus(replicationServerInfo.getGenerationId(),
            replicationServerInfo.getServerState(),
            replicationServerInfo.getDegradedStatusThreshold(),
            this.getGenerationID());
          // Perfom session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
            initStatus);
          // Perform session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(
            replicationServerInfo.getServerURL(), initStatus);
          if (topologyMsg != null) // Handshake phase 2 exchange went well
          {
            try
            {
@@ -681,7 +901,7 @@
               * reconnection at that time to retrieve a server with our group
               * id.
               */
              byte tmpRsGroupId = serverInfo.getGroupId();
              byte tmpRsGroupId = replicationServerInfo.getGroupId();
              boolean someServersWithSameGroupId =
                hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -690,10 +910,10 @@
                ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
              {
                replicationServer = session.getReadableRemoteAddress();
                maxSendWindow = serverInfo.getWindowSize();
                rsGroupId = serverInfo.getGroupId();
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                maxSendWindow = replicationServerInfo.getWindowSize();
                rsGroupId = replicationServerInfo.getGroupId();
                rsServerId = replicationServerInfo.getServerId();
                rsServerUrl = replicationServerInfo.getServerURL();
                receiveTopo(topologyMsg);
@@ -715,8 +935,8 @@
                if (domain != null)
                {
                  domain.sessionInitiated(
                      initStatus, serverInfo.getServerState(),
                      serverInfo.getGenerationId(),
                    initStatus, replicationServerInfo.getServerState(),
                    replicationServerInfo.getGenerationId(),
                      session);
                }
@@ -728,14 +948,14 @@
                 Message message =
                   WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
                   Byte.toString(groupId),  Integer.toString(rsServerId),
                   bestServer, Byte.toString(getRsGroupId()),
                    replicationServerInfo.getServerURL(),
                    Byte.toString(getRsGroupId()),
                   baseDn.toString(), Integer.toString(serverId));
                 logError(message);
                 startSameGroupIdPoller();
                }
                startRSHeartBeatMonitoring();
                if (serverInfo.getProtocolVersion()
                    >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
                if (replicationServerInfo.getProtocolVersion() >=
                  ProtocolVersion.REPLICATION_PROTOCOL_V3)
                {
                  startChangeTimeHeartBeatPublishing();
                }
@@ -753,7 +973,7 @@
            } catch (Exception e)
            {
              Message message = ERR_COMPUTING_FAKE_OPS.get(
                baseDn, bestServer,
                baseDn, replicationServerInfo.getServerURL(),
                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
              logError(message);
            } finally
@@ -783,8 +1003,9 @@
      {
        connectPhaseLock.notify();
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
          (serverInfo.getGenerationId() == -1))
        if ((replicationServerInfo.getGenerationId() ==
          this.getGenerationID()) ||
          (replicationServerInfo.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -801,7 +1022,7 @@
            baseDn.toString(),
            replicationServer,
            Long.toString(this.getGenerationID()),
            Long.toString(serverInfo.getGenerationId()));
            Long.toString(replicationServerInfo.getGenerationId()));
          logError(message);
        }
      } else
@@ -908,7 +1129,7 @@
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * server, wrapped in a ServerInfo object.
   * server, wrapped in a ReplicationServerInfo object.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
@@ -917,10 +1138,10 @@
   * @return The answer from the server . Null if could not
   *         get an answer.
   */
  private ServerInfo performPhaseOneHandshake(String server,
  private ReplicationServerInfo performPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ServerInfo serverInfo = null;
    ReplicationServerInfo replServerInfo = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
@@ -969,10 +1190,10 @@
        }
      // Wrap received message in a server info object
      serverInfo = ServerInfo.newServerInfo(msg);
      replServerInfo = ReplicationServerInfo.newInstance(msg);
      // Sanity check
      String repDn = serverInfo.getBaseDn();
      String repDn = replServerInfo.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -987,7 +1208,7 @@
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        serverInfo.getProtocolVersion());
        replServerInfo.getProtocolVersion());
      localSession.setProtocolVersion(protocolVersion);
@@ -1068,7 +1289,7 @@
      }
      if (error)
      {
        serverInfo = null;
        replServerInfo = null;
      } // Be sure to return null.
    }
@@ -1080,7 +1301,7 @@
      session = localSession;
    }
    return serverInfo;
    return replServerInfo;
  }
  /**
@@ -1395,269 +1616,560 @@
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   * This methods performs some filtering on the group id, then call
   * the real search for best server algorithm (searchForBestReplicationServer).
   * connect to it or determine if we must disconnect from current one to
   * re-connect to best server.
   *
   * Note: this method put as public static for unit testing purpose.
   * Note: this method is static for test purpose (access from unit tests)
   *
   * @param firstConnection True if we run this method for the very first
   * connection of the broker. False if we run this method to determine if the
   * replication server we are currently connected to is still the best or not.
   * @param rsServerId The id of the replication server we are currently
   * connected to. Only used when firstConnection is false.
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param localServerId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @param groupId The groupId we prefer being connected to if possible
   * @return The computed best replication server.
   * @param generationId The generation id we are using
   * @return The computed best replication server. If the returned value is
   * null, the best replication server is undetermined but the local server must
   * disconnect (so the best replication server is another one than the current
   * one). Null can only be returned when firstConnection is false.
   */
  public static String computeBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int localServerId,
    String baseDn, byte groupId)
  public static ReplicationServerInfo computeBestReplicationServer(
    boolean firstConnection, int rsServerId, ServerState myState,
    Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
    String baseDn, byte groupId, long generationId)
  {
    /*
     * Preference is given to servers with the requested group id:
     * If there are some servers with the requested group id in the provided
     * server list, then we run the search algorithm only on them. If no server
     * with the requested group id, consider all of them.
     */
    // Filter for servers with same group id
    Map<String, ServerInfo> sameGroupIdRsInfos =
      new HashMap<String, ServerInfo>();
    for (String repServer : rsInfos.keySet())
    {
      ServerInfo serverInfo = rsInfos.get(repServer);
      if (serverInfo.getGroupId() == groupId)
        sameGroupIdRsInfos.put(repServer, serverInfo);
    }
    // Some servers with same group id ?
    if (sameGroupIdRsInfos.size() > 0)
    {
      return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
        localServerId, baseDn);
    } else
    {
      return searchForBestReplicationServer(myState, rsInfos,
        localServerId, baseDn);
    }
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   *
   * Note: this method put as public static for unit testing purpose.
   *
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param localServerID The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. Then, among them, choose the server that is the most up to
     * date regarding the whole topology.
     *
     * If no server is up to date regarding our own server id, find the one who
     * is the most up to date regarding our server id.
     */
    // Should never happen (sanity check)
    if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) ||
      (baseDn == null))
    {
      return null;
    }
    // Shortcut, if only one server, this is the best
    if (rsInfos.size() == 1)
    {
      for (String repServer : rsInfos.keySet())
        return repServer;
      return rsInfos.values().iterator().next();
    }
    String bestServer = null;
    boolean bestServerIsLocal = false;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
    // Servers late with regard to our changes
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differentiate up to date servers from late ones.
    /**
     * Apply some filtering criteria to determine the best servers list from
     * the available ones. The ordered list of criteria is (from more important
     * to less important):
     * - replication server has the same group id as the local DS one
     * - replication server has the same generation id as the local DS one
     * - replication server is up to date regarding changes generated by the
     *   local DS
     * - replication server in the same VM as local DS one
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    Map<Integer, ReplicationServerInfo> newBestServers;
    // The list of best replication servers is filtered with each criteria. At
    // each criteria, the list is replaced with the filtered one if some there
    // are some servers from the filtering, otherwise, the list is left as is
    // and the new filtering for the next criteria is applied and so on.
    for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
    {
      newBestServers = null;
      switch (filterLevel)
      {
        case 1:
          // Use only servers locally configured: those are servers declared in
          // the local configuration. When the current method is called, for
          // sure, at least one server from the list is locally configured
          bestServers = filterServersLocallyConfigured(bestServers);
          break;
        case 2:
          // Some servers with same group id ?
          newBestServers = filterServersWithSameGroupId(bestServers, groupId);
          if (newBestServers.size() > 0)
          {
            bestServers = newBestServers;
          }
          break;
        case 3:
          // Some servers with same generation id ?
          newBestServers = filterServersWithSameGenerationId(bestServers,
            generationId);
          if (newBestServers.size() > 0)
          {
            // Ok some servers with the right generation id
            bestServers = newBestServers;
            // If some servers with the right generation id this is useful to
            // run the local DS change criteria
            newBestServers = filterServersWithAllLocalDSChanges(bestServers,
              myState, localServerId);
            if (newBestServers.size() > 0)
            {
              bestServers = newBestServers;
            }
          }
          break;
        case 4:
          // Some servers in the local VM ?
          newBestServers = filterServersInSameVM(bestServers);
          if (newBestServers.size() > 0)
          {
            bestServers = newBestServers;
          }
          break;
      }
    }
    /**
     * Now apply the choice base on the weight to the best servers list
     */
    if (bestServers.size() > 1)
    {
      if (firstConnection)
      {
        // We are no connected to a server yet
        return computeBestServerForWeight(bestServers, -1, -1);
      } else
      {
        // We are already connected to a RS: compute the best RS as far as the
        // weights is concerned. If this is another one, some DS must
        // disconnect.
        return computeBestServerForWeight(bestServers, rsServerId,
          localServerId);
      }
    } else
    {
      return bestServers.values().iterator().next();
    }
  }
  /**
   * Creates a new list that contains only replication servers that are locally
   * configured.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers locally configured
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersLocallyConfigured(Map<Integer,
    ReplicationServerInfo> bestServers)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.isLocallyConfigured())
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * passed group id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param groupId The group id that must match
   * @return The sub list of replication servers matching the requested group id
   * (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGroupId(Map<Integer,
    ReplicationServerInfo> bestServers, byte groupId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.getGroupId() == groupId)
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * passed generation id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param generationId The generation id that must match
   * @return The sub list of replication servers matching the requested
   * generation id (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithSameGenerationId(Map<Integer,
    ReplicationServerInfo> bestServers, long generationId)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (replicationServerInfo.getGenerationId() == generationId)
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Creates a new list that contains only replication servers that have the
   * latest changes from the passed DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @param localState The state of the local DS
   * @param localServerId The server id to consider for the changes
   * @return The sub list of replication servers that have the latest changes
   * from the passed DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo>
    filterServersWithAllLocalDSChanges(Map<Integer,
    ReplicationServerInfo> bestServers, ServerState localState,
    int localServerId)
  {
    Map<Integer, ReplicationServerInfo> upToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    // Extract the change number of the latest change generated by the local
    // server
    ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, localServerID);
      myChangeNumber = new ChangeNumber(0, 0, localServerId);
    }
    for (String repServer : rsInfos.keySet())
    {
      ServerState rsState = rsInfos.get(repServer).getServerState();
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
    /**
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. If some servers more up to date, prefer this list.
     */
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      ServerState rsState = replicationServerInfo.getServerState();
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, localServerID);
        rsChangeNumber = new ChangeNumber(0, 0, localServerId);
      }
      // Store state in right list
      // Has this replication server the latest local change ?
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      {
        upToDateServers.put(repServer, rsState);
        if (myChangeNumber.equals(rsChangeNumber))
        {
          // This replication server has exactly the latest change from the
          // local server
          upToDateServers.put(rsId, replicationServerInfo);
      } else
      {
        lateOnes.put(repServer, rsState);
          // This replication server is even more up to date than the local
          // server
          moreUpToDateServers.put(rsId, replicationServerInfo);
      }
    }
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose either :
       * - The local one
       * - The one that has the maximum number of changes to send us.
       *   This is the most up to date one regarding the whole topology.
       *   This server is the one which has the less
       *   difference with the topology server state.
       *   For comparison, we need to compute the difference for each
       *   server id with the topology server state.
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(), baseDn, Integer.toString(localServerID));
      logError(message);
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
       */
      ServerState topoState = new ServerState();
      for (ServerState curState : upToDateServers.values())
      {
        Iterator<Integer> it = curState.iterator();
        while (it.hasNext())
        {
          Integer sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Update topology state
          topoState.update(curSidCn);
        }
      } // For up to date servers
      // Min of the max shifts
      long minShift = -1L;
      for (String upServer : upToDateServers.keySet())
    if (moreUpToDateServers.size() > 0)
      {
        /*
         * Compute the maximum difference between the time of a server id's
         * change number and the time of the matching server id's change
         * number in the topology server state.
         *
         * Note: we could have used the sequence number here instead of the
         * timestamp, but this would have caused a problem when the sequence
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = 0;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Integer> it = curState.iterator();
        while (it.hasNext())
        {
          Integer sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Cannot be null as checked at construction time
          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          shift +=tmpShift;
        }
        boolean upServerIsLocal =
          ReplicationServer.isLocalReplicationServer(upServer);
        if ((minShift < 0) // First time in loop
            || ((shift < minShift) && upServerIsLocal)
            || (((bestServerIsLocal == false) && (shift < minShift)))
            || ((bestServerIsLocal == false) && (upServerIsLocal &&
                                              (shift<(minShift + 60)) ))
            || (shift+120 < minShift))
        {
          // This server is even closer to topo state
          bestServer = upServer;
          bestServerIsLocal = upServerIsLocal;
          minShift = shift;
        }
      } // For up to date servers
      // Prefer servers more up to date than local server
      return moreUpToDateServers;
    } else
    {
      /*
       * We could not find a replication server that has seen all the
       * changes that this server has already processed,
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn, lateOnes.size());
      logError(message);
      return upToDateServers;
    }
  }
      // Min of the shifts
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
  /**
   * Creates a new list that contains only replication servers that are in the
   * same VM as the local DS, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * @return The sub list of replication servers being in the same VM as the
   * local DS (which may be empty)
   */
  private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
    Map<Integer, ReplicationServerInfo> bestServers)
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      if (ReplicationServer.isLocalReplicationServer(
        replicationServerInfo.getServerURL()))
      {
        result.put(rsId, replicationServerInfo);
      }
    }
    return result;
  }
  /**
   * Computes the best replication server the local server should be connected
   * to so that the load is correctly spread across the topology, following the
   * weights guidance.
   * Warning: This method is expected to be called with at least 2 servers in
   * bestServers
   * Note: this method is static for test purpose (access from unit tests)
   * @param bestServers The list of replication servers to consider
   * @param currentRsServerId The replication server the local server is
   *        currently connected to. -1 if the local server is not yet connected
   *        to any replication server.
   * @param localServerId The server id of the local server. This is not used
   *        when it is not connected to a replication server
   *        (currentRsServerId = -1)
   * @return The replication server the local server should be connected to
   * as far as the weight is concerned. This may be the currently used one if
   * the weight is correctly spread. If the returned value is null, the best
   * replication server is undetermined but the local server must disconnect
   * (so the best replication server is another one than the current one).
   */
  public static ReplicationServerInfo computeBestServerForWeight(
    Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
    int localServerId)
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
     * - Compute the load goal of each RS, deducing it from the weights affected
     * to them.
     * - Compute the current load of each RS, deducing it from the DSs
     * currently connected to them.
     * - Compute the differences between the load goals and the current loads of
     * the RSs.
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
        if (ourSidCn == null)
    // Sum of the weights
    int sumOfWeights = 0;
    // Sum of the connected DSs
    int sumOfConnectedDSs = 0;
    for (ReplicationServerInfo replicationServerInfo : bestServers.values())
        {
          ourSidCn = new ChangeNumber(0, 0, localServerID);
      sumOfWeights += replicationServerInfo.getWeight();
      sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        boolean lateServerisLocal =
          ReplicationServer.isLocalReplicationServer(lateServer);
        if ((minShift < 0) // First time in loop
          || ((tmpShift < minShift) && lateServerisLocal)
          || (((bestServerIsLocal == false) && (tmpShift < minShift)))
          || ((bestServerIsLocal == false) && (lateServerisLocal &&
                                            (tmpShift<(minShift + 60)) ))
          || (tmpShift+120 < minShift))
    // Distance (difference) of the current loads to the load goals of each RS:
    // key:server id, value: distance
    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
    // Precision for the operations (number of digits after the dot)
    // Default value of rounding method is HALF_UP for
    // the MathContext
    MathContext mathContext = new MathContext(32);
    for (Integer rsId : bestServers.keySet())
        {
          // This server is even closer to topo state
          bestServer = lateServer;
          bestServerIsLocal = lateServerisLocal;
          minShift = tmpShift;
        }
      } // For late servers
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      int rsWeight = replicationServerInfo.getWeight();
      //  load goal = rs weight / sum of weights
      BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
        new BigDecimal(sumOfWeights), mathContext);
      BigDecimal currentLoadBd = BigDecimal.ZERO;
      if (sumOfConnectedDSs != 0)
      {
        // current load = number of connected DSs / total number of DSs
        int connectedDSs = replicationServerInfo.getConnectedDSNumber();
        currentLoadBd = (new BigDecimal(connectedDSs)).divide(
        new BigDecimal(sumOfConnectedDSs), mathContext);
    }
    return bestServer;
      // load distance = load goal - current load
      BigDecimal loadDistanceBd =
        loadGoalBd.subtract(currentLoadBd, mathContext);
      loadDistances.put(rsId, loadDistanceBd);
    }
    if (currentRsServerId == -1)
    {
      // The local server is not connected yet
      /*
       * Find the server with the current highest distance to its load goal and
       * choose it. Make an exception if every server is correctly balanced,
       * that is every current load distances are equal to 0, in that case,
       * choose the server with the highest weight
       */
      int bestRsId = 0; // If all server equal, return the first one
      float highestDistance = Float.NEGATIVE_INFINITY;
      boolean allRsWithZeroDistance = true;
      int highestWeightRsId = -1;
      int highestWeight = -1;
      for (Integer rsId : bestServers.keySet())
      {
        float loadDistance = loadDistances.get(rsId).floatValue();
        if (loadDistance > highestDistance)
        {
          // This server is far more from its balance point
          bestRsId = rsId;
          highestDistance = loadDistance;
        }
        if (loadDistance != (float)0)
        {
          allRsWithZeroDistance = false;
        }
        int weight = bestServers.get(rsId).getWeight();
        if (weight > highestWeight)
        {
          // This server has a higher weight
          highestWeightRsId = rsId;
          highestWeight = weight;
        }
      }
      // All servers with a 0 distance ?
      if (allRsWithZeroDistance)
      {
        // Choose server withe the highest weight
        bestRsId = highestWeightRsId;
      }
      return bestServers.get(bestRsId);
    } else
    {
      // The local server is currently connected to a RS, let's see if it must
      // disconnect or not, taking the weights into account.
      float currentLoadDistance =
        loadDistances.get(currentRsServerId).floatValue();
      if (currentLoadDistance < (float) 0)
      {
        // Too much DSs connected to the current RS, compared with its load
        // goal:
        // Determine the potential number of DSs to disconnect from the current
        // RS and see if the local DS is part of them: the DSs that must
        // disconnect are those with the lowest server id.
        // Compute the sum of the distances of the load goals of the other RSs
        BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
        for (Integer rsId : bestServers.keySet())
        {
          if (rsId != currentRsServerId)
          {
            sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
              loadDistances.get(rsId), mathContext);
          }
        }
        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
        {
          // The average distance of the other RSs shows a lack of DSs.
          // Compute the number of DSs to disconnect from the current RS,
          // rounding to the nearest integer number. Do only this if there is
          // no risk of yoyo effect: when the exact balance cannot be
          // established due to the current number of DSs connected, do not
          // disconnect a DS. A simple example where the balance cannot be
          // reached is:
          // - RS1 has weight 1 and 2 DSs
          // - RS2 has weight 1 and 1 DS
          // => disconnecting a DS from RS1 to reconnect it to RS2 would have no
          // sense as this would lead to the reverse situation. In that case,
          // the perfect balance cannot be reached and we must stick to the
          // current situation, otherwise the DS would keep move between the 2
          // RSs
          float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
            multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
            floatValue();
          int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
          // Avoid yoyo effect
          if (overloadingDSsNumber == 1)
          {
            // What would be the new load distance for the current RS if
            // we disconnect some DSs ?
            ReplicationServerInfo currentReplicationServerInfo =
              bestServers.get(currentRsServerId);
            int currentRsWeight = currentReplicationServerInfo.getWeight();
            BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
            BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
            BigDecimal currentRsLoadGoalBd =
              currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
            BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
            if (sumOfConnectedDSs != 0)
            {
              int connectedDSs = currentReplicationServerInfo.
                getConnectedDSNumber();
              BigDecimal potentialNewConnectedDSsBd =
                new BigDecimal(connectedDSs - 1);
              BigDecimal sumOfConnectedDSsBd =
                new BigDecimal(sumOfConnectedDSs);
              potentialCurrentRsNewLoadBd =
                potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
                mathContext);
            }
            BigDecimal potentialCurrentRsNewLoadDistanceBd =
              currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
              mathContext);
            // What would be the new load distance for the other RSs ?
            BigDecimal additionalDsLoadBd =
              (new BigDecimal(1)).divide(
              new BigDecimal(sumOfConnectedDSs), mathContext);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
              sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
              mathContext);
            // Now compare both values: we must no disconnect the DS if this
            // is for going in a situation where the load distance of the other
            // RSs is the opposite of the future load distance of the local RS
            // or we would evaluate that we should disconnect just after being
            // arrived on the new RS. But we should disconnect if we reach the
            // perfect balance (both values are 0).
            MathContext roundMc =
              new MathContext(6, RoundingMode.DOWN);
            BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
              potentialCurrentRsNewLoadDistanceBd.round(roundMc);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
              potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
            if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
              BigDecimal.ZERO) != 0)
              && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
              potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
            {
              // Avoid the yoyo effect, and keep the local DS connected to its
              // current RS
              return bestServers.get(currentRsServerId);
            }
          }
          // Prepare a sorted list (from lowest to highest) or DS server ids
          // connected to the current RS
          ReplicationServerInfo currentRsInfo =
            bestServers.get(currentRsServerId);
          List<Integer> serversConnectedToCurrentRS =
            currentRsInfo.getConnectedDSs();
          List<Integer> sortedServers = new ArrayList<Integer>(
            serversConnectedToCurrentRS);
          Collections.sort(sortedServers);
          // Go through the list of DSs to disconnect and see if the local
          // server is part of them.
          int index = 0;
          while (overloadingDSsNumber > 0)
          {
            int severToDisconnectId = sortedServers.get(index);
            if (severToDisconnectId == localServerId)
            {
              // The local server is part of the DSs to disconnect
              return null;
            }
            overloadingDSsNumber--;
            index++;
          }
          // The local server is not part of the servers to disconnect from the
          // current RS.
          return bestServers.get(currentRsServerId);
        } else
        {
          // The average distance of the other RSs does not show a lack of DSs:
          // no need to disconnect any DS from the current RS.
          return bestServers.get(currentRsServerId);
        }
      } else
      {
        // The RS load goal is reached or there are not enough DSs connected to
        // it to reach it: do not disconnect from this RS and return rsInfo for
        // this RS
        return bestServers.get(currentRsServerId);
      }
    }
  }
  /**
@@ -1679,28 +2191,6 @@
  }
  /**
   * Starts the same group id poller.
   */
  private void startSameGroupIdPoller()
  {
    sameGroupIdPoller = new SameGroupIdPoller();
    sameGroupIdPoller.start();
  }
  /**
   * Stops the same group id poller.
   */
  private synchronized void stopSameGroupIdPoller()
  {
    if (sameGroupIdPoller != null)
    {
      sameGroupIdPoller.shutdown();
      sameGroupIdPoller.waitForShutdown();
      sameGroupIdPoller = null;
    }
  }
  /**
   * Stop the heartbeat monitor thread.
   */
  synchronized void stopRSHeartBeatMonitoring()
@@ -1926,7 +2416,8 @@
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   * before being called. This is a wrapper to the method with a boolean version
   * so that we do not have to modify existing tests.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
@@ -1934,6 +2425,26 @@
   */
  public ReplicationMsg receive() throws SocketTimeoutException
  {
    return receive(false);
  }
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   * @param allowReconnectionMechanism If true, this allows the reconnection
   * mechanism to disconnect the broker if it detects that it should reconnect
   * to another replication server because of some criteria defined by the
   * algorithm where we choose a suitable replication server.
   */
  public ReplicationMsg receive(boolean allowReconnectionMechanism)
    throws SocketTimeoutException
  {
    while (shutdown == false)
    {
      if (!connected)
@@ -1956,13 +2467,16 @@
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else if (msg instanceof TopologyMsg)
        } else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg)msg;
          receiveTopo(topoMsg);
          if (allowReconnectionMechanism)
          {
            // Reset wait time before next computation of best server
            mustRunBestServerCheckingAlgorithm = 0;
        }
        else if (msg instanceof StopMsg)
        } else if (msg instanceof StopMsg)
        {
          /*
           * RS performs a proper disconnection
@@ -1974,8 +2488,7 @@
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else if (msg instanceof MonitorMsg)
        } else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
@@ -1997,16 +2510,53 @@
            monitorResponse.notify();
          }
          // Extract and store replication servers ServerStates
          rsStates = new HashMap<Integer, ServerState>();
          // Update the replication servers ServerStates with new received info
          it = monitorMsg.rsIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
            ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
            if (rsInfo != null)
            {
              rsInfo.update(monitorMsg.getRSServerState(srvId));
          }
        }
        else
          // Now if it is allowed, compute the best replication server to see if
          // it is still the one we are currently connected to. If not,
          // disconnect properly and let the connection algorithm re-connect to
          // best replication server
          if (allowReconnectionMechanism)
          {
            mustRunBestServerCheckingAlgorithm++;
            if (mustRunBestServerCheckingAlgorithm == 2)
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              ReplicationServerInfo bestServerInfo =
                computeBestReplicationServer(false, rsServerId, state,
                replicationServerInfos, serverId, baseDn, groupId,
                generationID);
              if ((bestServerInfo == null) ||
                (bestServerInfo.getServerId() != rsServerId))
              {
                // The best replication server is no more the one we are
                // currently using. Disconnect properly then reconnect.
                Message message =
                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
                  Integer.toString(serverId),
                  Integer.toString(rsServerId),
                  rsServerUrl);
                logError(message);
                reStart();
              }
              // Reset wait time before next computation of best server
              mustRunBestServerCheckingAlgorithm = 0;
            }
          }
        } else
        {
          return msg;
        }
@@ -2018,7 +2568,6 @@
        if (shutdown == false)
        {
          if ((session == null) || (!session.closeInitiated()))
          {
            /*
             * We did not initiate the close on our side, log an error message.
@@ -2066,7 +2615,8 @@
        }
      }
    } catch (InterruptedException e)
    {}
    {
    }
    return replicaStates;
  }
@@ -2103,10 +2653,9 @@
    if (debugEnabled())
    {
      debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
        " close the connection to replication server " + rsServerId + " for"
        + " domain " + baseDn);
        " close the connection to replication server " + rsServerId + " for" +
        " domain " + baseDn);
    }
    stopSameGroupIdPoller();
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    replicationServer = "stopped";
@@ -2248,8 +2797,8 @@
    // A new session is necessary only when information regarding
    // the connection is modified
    if ((servers == null) ||
        (!(replicationServers.size() == servers.size()
        && replicationServers.containsAll(servers))) ||
      (!(replicationServers.size() == servers.size() && replicationServers.
      containsAll(servers))) ||
        window != this.maxRcvWindow  ||
        heartbeatInterval != this.heartbeatInterval ||
        (groupId != this.groupId))
@@ -2313,152 +2862,6 @@
  }
  /**
   * In case we are connected to a RS with a different group id, we use this
   * thread to poll presence of a RS with the same group id as ours. If a RS
   * with the same group id is available, we close the session to force
   * reconnection. Reconnection will choose a server with the same group id.
   */
  private class SameGroupIdPoller extends DirectoryThread
  {
    private boolean sameGroupIdPollershutdown = false;
    private boolean terminated = false;
    // Sleep interval in ms
    private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000;
    public SameGroupIdPoller()
    {
      super("Replication Broker Same Group Id Poller for " + baseDn.toString() +
        " and group id " + groupId + " in server id " + serverId);
    }
    /**
     * Wait for the completion of the same group id poller.
     */
    public void waitForShutdown()
    {
      try
      {
        while (terminated == false)
        {
          Thread.sleep(50);
        }
      } catch (InterruptedException e)
      {
        // exit the loop if this thread is interrupted.
      }
    }
    /**
     * Shutdown the same group id poller.
     */
    public void shutdown()
    {
      sameGroupIdPollershutdown = true;
    }
    /**
     * Permanently look for RS with our group id and if found, break current
     * connection to force reconnection to a new server with the right group id.
     */
    @Override
    public void run()
    {
      boolean done = false;
      if (debugEnabled())
      {
        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
          " started.");
      }
      while ((!done) && (!sameGroupIdPollershutdown))
      {
        // Sleep some time between checks
        try
        {
          Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD);
        } catch (InterruptedException e)
        {
          // Stop as we are interrupted
          sameGroupIdPollershutdown = true;
        }
        synchronized (connectPhaseLock)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("Running SameGroupIdPoller for: " +
              baseDn.toString());
          }
          if (session != null) // Check only if not already disconnected
          {
            for (String server : servers)
            {
              // Do not ask the RS we are connected to as it has for sure the
              // wrong group id
              if (server.equals(rsServerUrl))
                continue;
              // Connect to server and get reply message
              ServerInfo serverInfo =
                performPhaseOneHandshake(server, false);
              // Is it a server with our group id ?
              if (serverInfo != null)
              {
                if (groupId == serverInfo.getGroupId())
                {
                  // Found one server with the same group id as us, disconnect
                  // session to force reconnection to a server with same group
                  // id.
                  Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
                    Byte.toString(groupId), baseDn.toString(),
                    Integer.toString(serverId));
                  logError(message);
                  if (protocolVersion >=
                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
                  {
                    // V4 protocol introduces a StopMsg to properly end
                    // communications
                    try
                    {
                      session.publish(new StopMsg());
                    } catch (IOException ioe)
                    {
                      // Anyway, going to close session, so nothing to do
                    }
                  }
                  try
                  {
                    session.close();
                  } catch (Exception e)
                  {
                    // The session was already closed, just ignore.
                  }
                  session = null;
                  done = true; // Terminates thread as did its job.
                  break;
                }
              }
            } // for server
          }
        }
      }
      terminated = true;
      if (debugEnabled())
      {
        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
          " terminated.");
      }
    }
  }
  /**
   * Signals the RS we just entered a new status.
   * @param newStatus The status the local DS just entered
   */
@@ -2505,7 +2908,42 @@
   */
  public List<RSInfo> getRsList()
  {
    return rsList;
    List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo replicationServerInfo :
      replicationServerInfos.values())
    {
      result.add(replicationServerInfo.toRSInfo());
    }
    return result;
  }
  /**
   * Computes the list of DSs connected to a particular RS.
   * @param rsId The RS id of the server one wants to know the connected DSs
   * @param dsList The list of DSinfo from which to compute things
   * @return The list of connected DSs to the server rsId
   */
  private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
  {
    List<Integer> connectedDSs = new ArrayList<Integer>();
    if (rsServerId == rsId)
    {
      // If we are computing connected DSs for the RS we are connected
      // to, we should count the local DS as the DSInfo of the local DS is not
      // sent by the replication server in the topology message. We must count
      // ourself as a connected server.
      connectedDSs.add(serverId);
    }
    for (DSInfo dsInfo : dsList)
    {
      if (dsInfo.getRsId() == rsId)
        connectedDSs.add(dsInfo.getDsId());
    }
    return connectedDSs;
  }
  /**
@@ -2516,13 +2954,49 @@
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    // Store new lists
    synchronized(getDsList())
    {
      synchronized(getRsList())
      {
    // Store new DS list
        dsList = topoMsg.getDsList();
        rsList = topoMsg.getRsList();
    // Update replication server info list with the received topology
    // information
    List<Integer> rsToKeepList = new ArrayList<Integer>();
    for (RSInfo rsInfo : topoMsg.getRsList())
    {
      int rsId = rsInfo.getId();
      rsToKeepList.add(rsId); // Mark this server as still existing
      List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
      ReplicationServerInfo replicationServerInfo =
        replicationServerInfos.get(rsId);
      if (replicationServerInfo == null)
      {
        // New replication server, create info for it add it to the list
        replicationServerInfo =
          new ReplicationServerInfo(rsInfo, connectedDSs);
        // Set the locally configured flag for this new RS only if it is
        // configured
        updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
        replicationServerInfos.put(rsId, replicationServerInfo);
      } else
      {
        // Update the existing info for the replication server
        replicationServerInfo.update(rsInfo, connectedDSs);
      }
    }
    /**
     * Now remove any replication server that may have disappeared from the
     * topology.
     */
    Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
      replicationServerInfos.entrySet().iterator();
    while (rsInfoIt.hasNext())
    {
      Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
      if (!rsToKeepList.contains(rsInfoEntry.getKey()))
      {
        // This replication server has quit the topology, remove it from the
        // list
        rsInfoIt.remove();
      }
    }
    if (domain != null)
@@ -2536,6 +3010,7 @@
      }
    }
  }
  /**
   * Check if the broker could not find any Replication Server and therefore
   * connection attempt failed.
@@ -2561,8 +3036,7 @@
            baseDn + " with " + getReplicationServer(),
            session, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
    }
    else
    } else
    {
      if (debugEnabled())
        TRACER.debugInfo(this +
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -719,7 +719,7 @@
      ReplicationMsg msg;
      try
      {
        msg = broker.receive();
        msg = broker.receive(true);
        if (msg == null)
        {
          // The server is in the shutdown process
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -69,7 +69,7 @@
    // TODO Check result;
    // Check update
    assertFalse(serverState.update(null));
    assertFalse(serverState.update((ChangeNumber)null));
    assertTrue(serverState.update(cn));
    assertFalse(serverState.update(cn));
    ChangeNumber cn1, cn2, cn3;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -518,7 +518,7 @@
        // Send topo view
        List<RSInfo> rsList = new ArrayList<RSInfo>();
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
        RSInfo rsInfo = new RSInfo(serverId, "localhost:" + port, generationId, groupId, 1);
        rsList.add(rsInfo);
        TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
          rsList);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -22,11 +22,14 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.opends.server.replication.service.ReplicationBroker.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -39,6 +42,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.server.ReplicationServer;
@@ -46,8 +50,8 @@
import org.testng.annotations.Test;
/**
 * Test the algorithm for find the best replication server among the configured
 * ones.
 * Test the algorithm for finding the best replication server among the
 * configured ones.
 */
public class ComputeBestServerTest extends ReplicationTestCase
{
@@ -93,7 +97,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -102,14 +107,15 @@
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -139,7 +145,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -150,14 +157,15 @@
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -191,7 +199,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -200,14 +209,15 @@
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -239,7 +249,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -250,14 +261,15 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -290,7 +302,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -301,9 +314,9 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -314,14 +327,15 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -354,7 +368,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -367,9 +382,9 @@
    // This server has less changes than the other one but it has the same
    // group id as us so he should be the winner
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -380,14 +395,15 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, LOOSER1, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -420,7 +436,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -431,9 +448,9 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -444,14 +461,15 @@
    cn = new ChangeNumber(2L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -485,7 +503,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -496,9 +515,9 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -509,9 +528,9 @@
    cn = new ChangeNumber(4L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
@@ -522,14 +541,15 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -563,7 +583,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -574,9 +595,9 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -587,9 +608,9 @@
    cn = new ChangeNumber(3L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, LOOSER2, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
@@ -602,14 +623,15 @@
    // This server has less changes than looser2 but it has the same
    // group id as us so he should be the winner
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(13, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
@@ -641,7 +663,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -652,278 +675,17 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 2 replication servers, late.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test2ServersLate() throws Exception
  {
    String testCase = "test2ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    int myId1 = 1;
    int myId2 = 2;
    int myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 3 replication servers, late.
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test3ServersLate() throws Exception
  {
    String testCase = "test3ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    int myId1 = 1;
    int myId2 = 2;
    int myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 6 replication servers, some up, some late, one null
   *
   * @throws Exception If a problem occurred
   */
  @Test
  public void test6ServersMixed() throws Exception
  {
    String testCase = "test6ServersMixed";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    int myId1 = 1;
    int myId2 = 2;
    int myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    final String LOOSER3 = "looser3";
    final String LOOSER4 = "looser4";
    final String LOOSER5 = "looser5";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(5L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(4L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(7L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER3, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 4
    aState = new ServerState();
    cn = new ChangeNumber(6L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(8L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 5 (null one for our serverid)
    aState = new ServerState();
    cn = new ChangeNumber(5L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER4, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 6
    aState = new ServerState();
    cn = new ChangeNumber(5L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(7L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER5, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  @DataProvider(name = "create3ServersData")
  public Object[][] create3ServersData() {
    return new Object[][] {
@@ -934,25 +696,26 @@
        { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
        // test that the local ServerID is more important than the others
        { 3, 0, 0, false, 1, 100, 100, false, 2, 100, 100, false},
        { 4, 0, 0, false, 2, 100, 100, false, 1, 100, 100, false},
        // test that the local RS is chosen first even if it is a bit late
        { 4, 1, 1, true, 4, 2, 3, false, 4, 2, 3, false},
        // test that the local RS is not chosen first when it is very late
        { 4, 1000, 1000, false, 4, 2, 3, true, 4, 2, 1000, true},
        // test that a remote RS is chosen first when up to date when the local
        // one is late
        { 4, 1, 1, false, 3, 1, 1, true, 3, 1, 1, false},
        // test that the local RS is not chosen first when it is missing
        // local changes
        { 4, 1, 1, false, 3, 2, 3, true, 1, 1, 1, false},
        { 4, 1, 1, false, 3, 2, 3, false, 1, 1, 1, true},
        // test that the local RS is not chosen first when it is missing
        // more local changes than another RS
        { 4, 1, 1, false, 2, 2, 3, true, 1, 1, 1, false},
        // test that a RS which is more up to date than the DS is chosen
        { 5, 1, 1, false, 2, 0, 0, false, 1, 1, 1, false},
        // test that a RS which is more up to date than the DS is chosen even
        // is some RS with the same last change from the DS
        { 5, 1, 1, false, 4, 0, 0, false, 4, 1, 1, false},
        // test that the local RS is chosen first when it is missing
        // the same local changes as the other RS
        { 3, 1, 1, true, 3, 1, 1, false, 3, 1, 1, false},
        // the same local changes as the other RSs
        { 3, 1, 1, true, 2, 1, 1, false, 3, 1, 1, false},
        };
  }
@@ -990,7 +753,8 @@
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
@@ -1001,9 +765,9 @@
    cn = new ChangeNumber(looser1T3, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser1IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
@@ -1016,9 +780,9 @@
    cn = new ChangeNumber(winnerT3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (winnerIsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
@@ -1031,17 +795,1025 @@
    cn = new ChangeNumber(looser2T3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser2IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte) 1,
      0L);
    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  @DataProvider(name = "test3ServersMoreCriteria")
  public Object[][] create3ServersMoreCriteriaData() {
    return new Object[][] {
        // Test that a RS is chosen if its group is ok whereas the other parameters
        // are not ok
        { 1L, 1L, (byte)1, false, 4L, 0L, (byte)2, false, 4L, 0L, (byte)3, false},
        // Test that a RS is chosen if its genid is ok (all RS with same group)
        // and state is not ok
        { 1L, 0L, (byte)1, false, 4L, 1L, (byte)1, false, 4L, 2L, (byte)1, false},
        // Test that a RS is chosen if all servers have wrong genid and group id
        // but it is local
        { 1L, 1L, (byte)2, true, 4L, 2L, (byte)3, false, 5L, 3L, (byte)4, false}
        };
  }
  /**
   * Test with 3 replication servers (see data provider)
   */
  @Test(dataProvider =  "test3ServersMoreCriteria")
  public void test3ServersMoreCriteria(
      long winnerT1, long winnerGenId, byte winnerGroupId, boolean winnerIsLocal,
      long looser1T1, long looser1GenId, byte looser1GroupId, boolean looser1IsLocal,
      long looser2T1, long looser2GenId, byte looser2GroupId, boolean looser2IsLocal)
      throws Exception
  {
    String testCase = "test3ServersMoreCriteria";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    int myId1 = 1;
    int myId2 = 2;
    int myId3 = 3;
    // definitions for server names
    final String WINNER  = "localhost:123";
    final String LOOSER1 = "localhost:456";
    final String LOOSER2 = "localhost:789";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
    mySt.update(cn);
    // Create replication servers info list
    HashMap<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(looser1T1, 0, myId1);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, looser1GenId,
      false, looser1GroupId, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser1IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(winnerT1, 0, myId1);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, winnerGenId,
      false, winnerGroupId, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (winnerIsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(looser2T1, 0, myId1);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, looser2GenId,
      false, looser2GroupId, 0);
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser2IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
    ReplicationServerInfo bestServer =
      computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte) 1,
      0L);
    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
    assertEquals(bestServer.getServerURL(),
      WINNER, "Wrong best replication server.");
  }
  @DataProvider(name = "testComputeBestServerForWeightProvider")
  public Object[][] testComputeBestServerForWeightProvider() {
    Object[][] testData = new Object[24][];
    HashMap<Integer, ReplicationServerInfo> rsInfos = null;
      new HashMap<Integer, ReplicationServerInfo>();
    RSInfo rsInfo = null;
    List<Integer> connectedDSs = null;
    Object[] params = null;
    /************************
     * First connection tests
     ************************/
    /**
     * 1 RS, no connected DSs
     * Expected winner: the RS
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "AwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "AwinnerHost:123"; // winner url
    testData[0] = params;
    /**
     * 2 RSs with TL=0.5, no connected DSs
     * Excepted winner: first in the list
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "BwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = rsInfos.values().iterator().next().getServerURL(); // winner url
    testData[1] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=1.0 - DS=1 ; RS2: TL=0.5 - CL=0 - DS=0
     * Excepted winner: R2 (still no connected DS)
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "CwinnerHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "CwinnerHost:456"; // winner url
    testData[2] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=0.5 - DS=1 ; RS2: TL=0.5 - CL=0.5 - DS=1
     * Excepted winner: first in the list as both RSs reached TL
     * and have same weight
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "DwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = rsInfos.values().iterator().next().getServerURL(); // winner url
    testData[3] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=2/3 - DS=2 ; RS2: TL=0.5 - CL=1/3 - DS=1
     * Excepted winner: RS2 -> 2 DSs on each RS
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "EwinnerHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "EwinnerHost:456"; // winner url
    testData[4] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=0.5 - DS=1 ; RS2: TL=2/3 - CL=0.5 - DS=1
     * Excepted winner: RS2 -> go to perfect load balance
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "FwinnerHost:456", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "FwinnerHost:456"; // winner url
    testData[5] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=1/3 - DS=1 ; RS2: TL=2/3 - CL=2/3 - DS=2
     * Excepted winner: RS2 -> already load balanced so choose server with the
     * highest weight
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "GwinnerHost:456", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "GwinnerHost:456"; // winner url
    testData[6] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=1/3 - DS=2 ; RS2: TL=2/3 - CL=2/3 - DS=4
     * Excepted winner: RS2 -> already load balanced so choose server with the
     * highest weight
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "HwinnerHost:456", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    connectedDSs.add(103);
    connectedDSs.add(104);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "HwinnerHost:456"; // winner url
    testData[7] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/6 - CL=1/6 - DS=1 ; RS2: TL=2/6 - CL=2/6 - DS=2 ; RS3: TL=3/6 - CL=3/6 - DS=3
     * Excepted winner: RS3 -> already load balanced so choose server with the
     * highest weight
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "IwinnerHost:789", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    connectedDSs.add(202);
    connectedDSs.add(203);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "IwinnerHost:789"; // winner url
    testData[8] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=5/10 - CL=3/9 - DS=3 ; RS2: TL=3/10 - CL=5/9 - DS=5 ; RS3: TL=2/10 - CL=1/9 - DS=1
     * Excepted winner: RS1 -> misses more DSs than RS3
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "JwinnerHost:123", 0L, (byte)1, 5);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    connectedDSs.add(103);
    connectedDSs.add(104);
    connectedDSs.add(105);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = -1; // current RS id
    params[2] = -1; // local DS id
    params[3] = "JwinnerHost:123"; // winner url
    testData[9] = params;
    /*************************
     * Already connected tests
     *************************/
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=0.5 - DS=1 ; RS2: TL=0.5 - CL=0.5 - DS=1
     * Excepted winner: RS2 (stay connected to it as load correctly spread)
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "KwinnerHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 12; // current RS id
    params[2] = 101; // local DS id
    params[3] = "KwinnerHost:456"; // winner url
    testData[10] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=1.0 - DS=2 ; RS2: TL=0.5 - CL=0.0 - DS=0
     * Excepted winner: RS2 (one must disconnect from RS1)
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "LwinnerHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 1; // local DS id
    params[3] = null; // winner url
    testData[11] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=1.0 - DS=2 ; RS2: TL=0.5 - CL=0.0 - DS=0
     * Excepted winner: RS1 (one server must disconnect from RS1 but it is the
     * one with the lowest id so not DS with server id 2)
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "MwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 2; // local DS id
    params[3] = "MwinnerHost:123"; // winner url
    testData[12] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.3 - CL=0.3 - DS=6 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
     * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.2 - DS=4
     * Excepted winner: RS2 no change as load correctly spread
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    connectedDSs.add(4);
    connectedDSs.add(5);
    connectedDSs.add(6);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "NwinnerHost:456", 0L, (byte)1, 4);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    connectedDSs.add(103);
    connectedDSs.add(104);
    connectedDSs.add(105);
    connectedDSs.add(106);
    connectedDSs.add(107);
    connectedDSs.add(108);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    connectedDSs.add(202);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(301);
    connectedDSs.add(302);
    connectedDSs.add(303);
    connectedDSs.add(304);
    rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 12; // current RS id
    params[2] = 101; // local DS id
    params[3] = "NwinnerHost:456"; // winner url
    testData[13] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
     * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.3 - DS=6
     * Excepted winner: RS2: no change load ok on current server and there is the
     * possibility to arrange load for other servers with disconnection from
     * 2 DSs from RS4 and reconnect them to RS1 (we moved these 2 servers from
     * previous test where the loads were ok)
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    connectedDSs.add(4);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "OwinnerHost:456", 0L, (byte)1, 4);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    connectedDSs.add(103);
    connectedDSs.add(104);
    connectedDSs.add(105);
    connectedDSs.add(106);
    connectedDSs.add(107);
    connectedDSs.add(108);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    connectedDSs.add(202);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(301);
    connectedDSs.add(302);
    connectedDSs.add(303);
    connectedDSs.add(304);
    connectedDSs.add(305);
    connectedDSs.add(306);
    rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 12; // current RS id
    params[2] = 101; // local DS id
    params[3] = "OwinnerHost:456"; // winner url
    testData[14] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
     * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.3 - DS=6
     * Excepted winner: RS4 : 2 DSs should go away from RS4 and server id 302
     * is one of the two lowest ids connected to RS4
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "PwinnerHost:123", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    connectedDSs.add(4);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 4);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    connectedDSs.add(103);
    connectedDSs.add(104);
    connectedDSs.add(105);
    connectedDSs.add(106);
    connectedDSs.add(107);
    connectedDSs.add(108);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    connectedDSs.add(202);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(306);
    connectedDSs.add(305);
    connectedDSs.add(304);
    connectedDSs.add(303);
    connectedDSs.add(302);
    connectedDSs.add(301);
    rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 14; // current RS id
    params[2] = 302; // local DS id
    params[3] = null; // winner url
    testData[15] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
     * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.3 - DS=6
     * Excepted winner: RS1 : 2 DSs should go away from RS4 but server id 303
     * is not one of the two lowest ids connected to RS4
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    connectedDSs.add(4);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 4);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(101);
    connectedDSs.add(102);
    connectedDSs.add(103);
    connectedDSs.add(104);
    connectedDSs.add(105);
    connectedDSs.add(106);
    connectedDSs.add(107);
    connectedDSs.add(108);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    connectedDSs.add(202);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(14, "QwinnerHost:1011", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(306);
    connectedDSs.add(305);
    connectedDSs.add(304);
    connectedDSs.add(303);
    connectedDSs.add(302);
    connectedDSs.add(301);
    rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 14; // current RS id
    params[2] = 303; // local DS id
    params[3] = "QwinnerHost:1011"; // winner url
    testData[16] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.65 - DS=13 ;
     * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.05 - DS=1
     * Excepted winner: RS2: no change load ok on current server and there is the
     * possibility to arrange load for other servers with disconnection from
     * 2 DSs from RS4 and reconnect them to RS1 (we moved these 2 servers from
     * previous test where the loads were ok)
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    connectedDSs.add(4);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 4);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(113);
    connectedDSs.add(112);
    connectedDSs.add(111);
    connectedDSs.add(110);
    connectedDSs.add(109);
    connectedDSs.add(108);
    connectedDSs.add(107);
    connectedDSs.add(106);
    connectedDSs.add(105);
    connectedDSs.add(104);
    connectedDSs.add(103);
    connectedDSs.add(102);
    connectedDSs.add(101);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(201);
    connectedDSs.add(202);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(301);
    rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 12; // current RS id
    params[2] = 105; // local DS id
    params[3] = null; // winner url
    testData[17] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=2/3 - DS=2 ; RS2: TL=0.5 - CL=1/3 - DS=1
     * Excepted winner: RS1. Local server should stay connected to current one
     * as the balance cannot be done. We already have the nearest possible
     * balance to the load goals: disconnection would cause a yoyo effect and
     * the local server would not stop going and coming back to/from the other
     * RS.
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "RwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(3);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 1; // local DS id
    params[3] = "RwinnerHost:123"; // winner url
    testData[18] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=0.5 - CL=2/3 - DS=2 ; RS2: TL=0.5 - CL=1/3 - DS=1
     * Excepted winner: RS1. Local server should stay connected to current one
     * as the balance cannot be done. We already have the nearest possible
     * balance to the load goals: disconnection would cause a yoyo effect and
     * the local server would not stop going and coming back to/from the other
     * RS.
     * Note: Same test as before, but not with the lowest local DS server id
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "SwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(3);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 2; // local DS id
    params[3] = "SwinnerHost:123"; // winner url
    testData[19] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=2/4 - DS=2 ; RS2: TL=1/3 - CL=1/4 - DS=1 ; RS3: TL=1/3 - CL=1/4 - DS=1
     * Excepted winner: RS1. Local server should stay connected to current one
     * as the balance cannot be done. We already have the nearest possible
     * balance to the load goals: disconnection would cause a yoyo effect and
     * the local server would not stop going and coming back between RSs.
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "TwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(3);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(4);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 1; // local DS id
    params[3] = "TwinnerHost:123"; // winner url
    testData[20] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=3/7 - DS=3 ; RS2: TL=1/3 - CL=2/7 - DS=2 ; RS3: TL=1/3 - CL=2/7 - DS=2
     * Excepted winner: RS1. Local server should stay connected to current one
     * as the balance cannot be done. We already have the nearest possible
     * balance to the load goals: disconnection would cause a yoyo effect and
     * the local server would not stop going and coming back between RSs.
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "UwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    connectedDSs.add(3);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(4);
    connectedDSs.add(5);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(6);
    connectedDSs.add(7);
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 1; // local DS id
    params[3] = "UwinnerHost:123"; // winner url
    testData[21] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=2/3 - DS=2 ; RS2: TL=1/3 - CL=1/3 - DS=1 ; RS3: TL=1/3 - CL=0 - DS=0
     * Excepted winner: RS3. Local server should disconnect for reconnection to
     * RS3
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(3);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "VwinnerHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 1; // local DS id
    params[3] = null; // winner url
    testData[22] = params;
    /**
     * TL = target load
     * CL = current load
     * DS = connected DSs number
     * RS1: TL=1/3 - CL=2/3 - DS=2 ; RS2: TL=1/3 - CL=1/3 - DS=1 ; RS3: TL=1/3 - CL=0 - DS=0
     * Excepted winner: RS3. Local server (2) should stay connected while
     * DS server id 1 should disconnect for reconnection to RS3
     */
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    rsInfo = new RSInfo(11, "WwinnerHost:123", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(1);
    connectedDSs.add(2);
    rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    connectedDSs.add(3);
    rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
    rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
    connectedDSs = new ArrayList<Integer>();
    rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
    params = new Object[4];
    params[0] = rsInfos;
    params[1] = 11; // current RS id
    params[2] = 2; // local DS id
    params[3] = "WwinnerHost:123"; // winner url
    testData[23] = params;
    return testData;
  }
  /**
   * Test the method that chooses the best RS using the RS weights
   */
  @Test(dataProvider =  "testComputeBestServerForWeightProvider")
  public void testComputeBestServerForWeight(
      Map<Integer, ReplicationServerInfo> servers, int currentRsServerId,
      int localServerId, String winnerUrl)
      throws Exception
  {
    String testCase = "testComputeBestServerForWeight";
    debugInfo("Starting " + testCase);
    ReplicationServerInfo bestServer =
      computeBestServerForWeight(servers, currentRsServerId, localServerId);
    if (winnerUrl == null)
    {
      // We expect null
      String url = null;
      if (bestServer != null)
      {
        url = bestServer.getServerURL();
      }
      assertNull(bestServer, "The best server should be null but is: " + url);
    } else
    {
      assertNotNull(bestServer, "The best server should not be null");
      assertEquals(bestServer.getServerURL(),
        winnerUrl, "Wrong best replication server: " + bestServer.getServerURL());
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
New file
@@ -0,0 +1,1318 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.types.DirectoryException;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.types.DN;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static org.opends.server.TestCaseUtils.*;
/**
 * Test in real situations the algorithm for load balancing the DSs connections
 * to the RSs. This uses the weights of the RSs. We concentrate the tests on
 * weight only: all servers have the same group id, gen id an states.
 */
public class ReplicationServerLoadBalancingTest extends ReplicationTestCase
{
  // Number of DSs
  private static final int NDS = 20;
  // Number of RSs
  private static final int NRS = 4;
  private LDAPReplicationDomain rd[] = new LDAPReplicationDomain[NDS];
  private ReplicationServer rs[] = new ReplicationServer[NRS];
  private int[] rsPort = new int[NRS];
  private static final int RS1_ID = 501;
  private static final int RS2_ID = 502;
  private static final int RS3_ID = 503;
  private static final int RS4_ID = 504;
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  private void initTest()
  {
    for (int i = 0 ; i < NDS; i++)
    {
      rd[i] = null;
    }
    for (int i = 0 ; i < NRS; i++)
    {
      rs[i] = null;
      rsPort[i] = -1;
    }
    findFreePorts();
  }
  /**
   * Find needed free TCP ports.
   */
  private void findFreePorts()
  {
    try
    {
      ServerSocket[] ss = new ServerSocket[NRS];
      for (int i = 0; i < NRS; i++)
      {
        ss[i] = TestCaseUtils.bindFreePort();
        rsPort[i] = ss[i].getLocalPort();
      }
      for (int i = 0; i < NRS; i++)
      {
        ss[i].close();
      }
    } catch (IOException e)
    {
      fail("Unable to determinate some free ports " +
        stackTraceToSingleLineString(e));
    }
  }
  private void endTest()
  {
    for (int i = 0 ; i < NDS; i++)
    {
      if (rd[i] != null)
      {
        rd[i].shutdown();
        rd[i] = null;
      }
    }
    try
    {
      // Clear any reference to a domain in synchro plugin
      MultimasterReplication.deleteDomain(DN.decode(TEST_ROOT_DN_STRING));
    } catch (DirectoryException ex)
    {
      fail("Error deleting reference to domain: " + TEST_ROOT_DN_STRING);
    }
    for (int i = 0; i < NRS; i++)
    {
      if (rs[i] != null)
      {
        rs[i].clearDb();
        rs[i].remove();
        rs[i] = null;
      }
      rsPort[i] = -1;
    }
  }
  /**
   * Creates the list of servers to represent the RS topology matching the
   * passed test case.
   */
  private SortedSet<String> createRSListForTestCase(String testCase)
  {
    SortedSet<String> replServers = new TreeSet<String>();
    if (testCase.equals("testFailoversAndWeightChanges"))
    {
      // 4 servers used for this test case.
      for (int i = 0; i < NRS; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testSpreadLoad"))
    {
      // 4 servers used for this test case.
      for (int i = 0; i < NRS; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testNoYoyo1"))
    {
      // 2 servers used for this test case.
      for (int i = 0; i < 2; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testNoYoyo2"))
    {
      // 3 servers used for this test case.
      for (int i = 0; i < 3; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testNoYoyo3"))
    {
      // 3 servers used for this test case.
      for (int i = 0; i < 3; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else
      fail("Unknown test case: " + testCase);
    return replServers;
  }
  /**
   * Creates a new ReplicationServer.
   */
  private ReplicationServer createReplicationServer(int rsIndex,
    int weight, String testCase)
  {
    SortedSet<String> replServers = new TreeSet<String>();
    try
    {
      if (testCase.equals("testFailoversAndWeightChanges"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testSpreadLoad"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testNoYoyo1"))
      {
        // 2 servers used for this test case.
        for (int i = 0; i < 2; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testNoYoyo2"))
      {
        // 3 servers used for this test case.
        for (int i = 0; i < 3; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testNoYoyo3"))
      {
        // 3 servers used for this test case.
        for (int i = 0; i < 3; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else
        fail("Unknown test case: " + testCase);
      String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
        replServers, 1, 1000, 5000, weight);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      return replicationServer;
    } catch (Exception e)
    {
      fail("createReplicationServer " + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Returns a suitable RS configuration with the passed new weight
   */
  private ReplicationServerCfg createReplicationServerConfigWithNewWeight
    (int rsIndex, int weight, String testCase)
  {
    SortedSet<String> replServers = new TreeSet<String>();
    try
    {
      if (testCase.equals("testFailoversAndWeightChanges"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testSpreadLoad"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else
        fail("Unknown test case: " + testCase);
      String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
        replServers, 1, 1000, 5000, weight);
      return conf;
    } catch (Exception e)
    {
      fail("createReplicationServerConfigWithNewWeight " + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Creates a new ReplicationDomain.
   */
  private LDAPReplicationDomain createReplicationDomain(int serverId,
    String testCase)
  {
    SortedSet<String> replServers = null;
    try
    {
      replServers = createRSListForTestCase(testCase);
      DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId+1, replServers, 1);
      LDAPReplicationDomain replicationDomain =
        MultimasterReplication.createNewDomain(domainConf);
      replicationDomain.start();
      return replicationDomain;
    } catch (Exception e)
    {
      fail("createReplicationDomain " + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Basic weight test: starts some RSs with different weights, start some DSs
   * and check the DSs are correctly spread across the RSs
   * @throws Exception If a problem occurred
   */
  @Test
  public void testSpreadLoad() throws Exception
  {
    String testCase = "testSpreadLoad";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /**
       * Start RS1 weigth=1, RS2 weigth=2, RS3 weigth=3, RS4 weigth=4
       */
      // Create and start RS1
      rs[0] = createReplicationServer(0, 1, testCase);
      // Create and start RS2
      rs[1] = createReplicationServer(1, 2, testCase);
      // Create and start RS3
      rs[2] = createReplicationServer(2, 3, testCase);
      // Create and start RS4
      rs[3] = createReplicationServer(3, 4, testCase);
      // Start a first DS to make every RSs inter connect
      rd[0] = createReplicationDomain(0, testCase);
        assertTrue(rd[0].isConnected());
      // Wait for RSs inter-connections
      checkRSConnectionsAndGenId(new int[] {0, 1, 2, 3},
        "Waiting for RSs inter-connections");
      /**
       * Start the 19 other DSs. One should end up with:
       * - RS1 has 2 DSs
       * - RS2 has 4 DSs
       * - RS3 has 6 DSs
       * - RS4 has 8 DSs
       */
      for (int i = 1; i < NDS; i++)
      {
        rd[i] = createReplicationDomain(i, testCase);
        assertTrue(rd[i].isConnected());
      }
     // Now check the number of connected DSs for each RS
     assertEquals(getDSConnectedToRS(0), 2,
       "Wrong expected number of DSs connected to RS1");
     assertEquals(getDSConnectedToRS(1), 4,
       "Wrong expected number of DSs connected to RS2");
     assertEquals(getDSConnectedToRS(2), 6,
       "Wrong expected number of DSs connected to RS3");
     assertEquals(getDSConnectedToRS(3), 8,
       "Wrong expected number of DSs connected to RS4");
    } finally
    {
      endTest();
    }
  }
  /**
   * Return the number of DSs currently connected to the RS with the passed
   * index
   */
  private int getDSConnectedToRS(int rsIndex)
  {
    Iterator<ReplicationServerDomain> rsdIt = rs[rsIndex].getDomainIterator();
    if (rsdIt == null) // No domain yet so no connections yet
      return 0;
    return rsdIt.next().getConnectedDSs().keySet().
      size();
  }
  /**
   * Waits for secTimeout seconds (before failing) that all RSs are connected
   * together and that they have the same generation id.
   * @param rsIndexes List of the indexes of the RSs that should all be
   *        connected together at the end
   * @param msg The message to display if the condition is not met before
   *        timeout
   */
  private void checkRSConnectionsAndGenId(int[] rsIndexes, String msg)
  {
    debugInfo("checkRSConnectionsAndGenId for <" + msg + ">");
    // Number of seconds to wait for condition before failing
    int secTimeout = 30;
    // Number of seconds already passed
    int nSec = 0;
    // Number of RSs to take into account
    int nRSs = rsIndexes.length;
    // Go out of the loop only if connection is verified or if timeout occurs
    while (true)
    {
      // Test connection
      boolean connected = false;
      boolean sameGenId = false;
      Iterator<ReplicationServerDomain> rsdIt = null;
      // Connected together ?
      int nOk = 0;
      for (int i = 0; i < nRSs; i++)
      {
        int rsIndex = rsIndexes[i];
        ReplicationServer repServer = rs[rsIndex];
        rsdIt = repServer.getDomainIterator();
        int curRsId = repServer.getServerId();
        Set<Integer> connectedRSsId = null;
        if (rsdIt != null)
        {
          connectedRSsId = rsdIt.next().getConnectedRSs().keySet();
        } else
        {
          // No domain yet, RS is not yet connected to others
          debugInfo("RS " + curRsId + " has no domain yet");
          break;
        }
        // Does this RS see all other RSs
        int nPeer = 0;
        debugInfo("Checking RSs connected to RS " + curRsId);
        for (int j = 0; j < nRSs; j++)
        {
          int otherRsIndex = rsIndexes[j];
          if (otherRsIndex != rsIndex) // Treat only other RSs
          {
            int otherRsId = otherRsIndex+501;
            if (connectedRSsId.contains(otherRsId))
            {
              debugInfo("\tRS " + curRsId + " sees RS " + otherRsId);
              nPeer++;
            } else
            {
              debugInfo("\tRS " + curRsId + " does not see RS " + otherRsId);
            }
          }
        }
        if (nPeer == nRSs-1)
          nOk++;
      }
      if (nOk == nRSs)
      {
        debugInfo("Connections are ok");
        connected = true;
      } else
      {
        debugInfo("Connections are not ok");
      }
      // Same gen id ?
      long refGenId = -1L;
      boolean refGenIdInitialized = false;
      nOk = 0;
      rsdIt = null;
      for (int i = 0; i < nRSs; i++)
      {
        ReplicationServer repServer = rs[i];
        rsdIt = repServer.getDomainIterator();
        int curRsId = repServer.getServerId();
        Long rsGenId = -1L;
        if (rsdIt != null)
        {
          rsGenId = rsdIt.next().getGenerationId();
        } else
        {
          // No domain yet, RS is not yet connected to others
          debugInfo("RS " + curRsId + " has no domain yet");
          break;
        }
        // Expecting all RSs to have gen id equal and not -1
        if ((rsGenId == -1L))
        {
          debugInfo("\tRS " + curRsId + " gen id is -1 which is not expected");
          break;
        } else
        {
          if (!refGenIdInitialized)
          {
            // Store reference gen id all RSs must have
            refGenId = rsGenId;
            refGenIdInitialized = true;
          }
        }
        if (rsGenId == refGenId)
        {
          debugInfo("\tRS " + curRsId + " gen id is " + rsGenId + " as expected");
          nOk++;
        } else
        {
          debugInfo("\tRS " + curRsId + " gen id is " + rsGenId
            + " but expected " + refGenId);
        }
      }
      if (nOk == nRSs)
      {
        debugInfo("Gen ids are ok");
        sameGenId = true;
      } else
      {
        debugInfo("Gen ids are not ok");
      }
      if (connected && sameGenId)
      {
        // Connection verified
        debugInfo("checkRSConnections: all RSs connected and with same gen id obtained after "
          + nSec + " seconds.");
        return;
      }
      // Sleep 1 second
      try
      {
        Thread.sleep(1000);
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      nSec++;
      if (nSec > secTimeout)
      {
        // Timeout reached, end with error
        fail("checkRSConnections: could not obtain that RSs are connected and have the same gen id after "
          + (nSec-1) + " seconds. [" + msg + "]");
      }
    }
  }
  /**
   * Execute a full scenario with some RSs failovers and dynamic weight changes.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  public void testFailoversAndWeightChanges() throws Exception
  {
    String testCase = "testFailoversAndWeightChanges";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /**
       * RS1 (weight=1) starts
       */
      rs[0] = createReplicationServer(0, 1, testCase);
      /**
       * DS1 starts and connects to RS1
       */
      rd[0] = createReplicationDomain(0, testCase);
      assertTrue(rd[0].isConnected());
      assertEquals(rd[0].getRsServerId(), RS1_ID);
      /**
       * RS2 (weight=1) starts
       */
      rs[1] = createReplicationServer(1, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1},
        "Waiting for RS2 connected to peers");
      /**
       * DS2 starts and connects to RS2
       */
      rd[1] = createReplicationDomain(1, testCase);
      assertTrue(rd[1].isConnected());
      assertEquals(rd[1].getRsServerId(), RS2_ID);
      /**
       * RS3 (weight=1) starts
       */
      rs[2] = createReplicationServer(2, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1, 2},
        "Waiting for RS3 connected to peers");
      /**
       * DS3 starts and connects to RS3
       */
      rd[2] = createReplicationDomain(2, testCase);
      assertTrue(rd[2].isConnected());
      assertEquals(rd[2].getRsServerId(), RS3_ID);
      /**
       * DS4 starts and connects to RS1, RS2 or RS3
       */
      rd[3] = createReplicationDomain(3, testCase);
      assertTrue(rd[3].isConnected());
      int ds4ConnectedRsId = rd[3].getRsServerId();
      assertTrue((ds4ConnectedRsId == RS1_ID) || (ds4ConnectedRsId == RS2_ID) ||
        (ds4ConnectedRsId == RS3_ID),
        "DS4 should be connected to either RS1, RS2 or RS3 but is it is " +
        "connected to RS id " + ds4ConnectedRsId);
      /**
       * DS5 starts and connects to one of the 2 other RSs
       */
      rd[4] = createReplicationDomain(4, testCase);
      assertTrue(rd[4].isConnected());
        int ds5ConnectedRsId = rd[4].getRsServerId();
      assertTrue((ds5ConnectedRsId != ds4ConnectedRsId),
        "DS5 should be connected to a RS which is not the same as the one of " +
        "DS4 (" + ds4ConnectedRsId + ")");
      /**
       * DS6 starts and connects to the RS with one DS
       */
      rd[5] = createReplicationDomain(5, testCase);
      assertTrue(rd[5].isConnected());
        int ds6ConnectedRsId = rd[5].getRsServerId();
      assertTrue((ds6ConnectedRsId != ds4ConnectedRsId) &&
        (ds6ConnectedRsId != ds5ConnectedRsId),
        "DS6 should be connected to a RS which is not the same as the one of " +
        "DS4 (" + ds4ConnectedRsId + ") or DS5 (" + ds5ConnectedRsId + ") : " +
        ds6ConnectedRsId);
      /**
       * DS7 to DS12 start, we must end up with RS1, RS2 and RS3 each with 4 DSs
       */
      for (int i = 6; i < 12; i++)
      {
        rd[i] = createReplicationDomain(i, testCase);
        assertTrue(rd[i].isConnected());
      }
      // Now check the number of connected DSs for each RS
      assertEquals(getDSConnectedToRS(0), 4,
        "Wrong expected number of DSs connected to RS1");
      assertEquals(getDSConnectedToRS(1), 4,
        "Wrong expected number of DSs connected to RS2");
      assertEquals(getDSConnectedToRS(2), 4,
        "Wrong expected number of DSs connected to RS3");
      /**
       * RS4 (weight=1) starts, we must end up with RS1, RS2, RS3 and RS4 each
       * with 3 DSs
       */
      rs[3] = createReplicationServer(3, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1, 2, 3},
        "Waiting for RS4 connected to peers");
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {3, 3, 3, 3}},
        "RS4 started, each RS should have 3 DSs connected to it");
      /**
       * Change RS3 weight from 1 to 3, we must end up with RS1, RS2 and RS4
       * each with 2 DSs and RS3 with 6 DSs
       */
      // Change RS3 weight to 3
      ReplicationServerCfg newRSConfig =
        createReplicationServerConfigWithNewWeight(2, 3, testCase);
      rs[2].applyConfigurationChange(newRSConfig);
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 2, 6, 2}},
        "RS3 changed weight from 1 to 3");
      /**
       * DS13 to DS20 start, we must end up with RS1, RS2 and RS4 each with 3
       * or 4 DSs (1 with 4 and the 2 others with 3) and RS3 with 10 DSs
       */
      for (int i = 12; i < 20; i++)
      {
        rd[i] = createReplicationDomain(i, testCase);
        assertTrue(rd[i].isConnected());
      }
      int rsWith4DsIndex = -1; // The RS (index) that has 4 DSs
      // Now check the number of connected DSs for each RS
      int rs1ConnectedDSNumber = getDSConnectedToRS(0);
      assertTrue(((rs1ConnectedDSNumber == 3) || (rs1ConnectedDSNumber == 4)),
        "Wrong expected number of DSs connected to RS1: " +
        rs1ConnectedDSNumber);
      if (rs1ConnectedDSNumber == 4)
        rsWith4DsIndex = 0;
      int rs2ConnectedDSNumber = getDSConnectedToRS(1);
      assertTrue(((rs2ConnectedDSNumber == 3) || (rs2ConnectedDSNumber == 4)),
        "Wrong expected number of DSs connected to RS2: " +
        rs2ConnectedDSNumber);
      if (rs2ConnectedDSNumber == 4)
        rsWith4DsIndex = 1;
      int rs4ConnectedDSNumber = getDSConnectedToRS(3);
      assertTrue(((rs4ConnectedDSNumber == 3) || (rs4ConnectedDSNumber == 4)),
        "Wrong expected number of DSs connected to RS4: " +
        rs4ConnectedDSNumber);
      if (rs4ConnectedDSNumber == 4)
        rsWith4DsIndex = 3;
      int sumOfRs1Rs2Rs4 = rs1ConnectedDSNumber + rs2ConnectedDSNumber +
        rs4ConnectedDSNumber;
      assertEquals(sumOfRs1Rs2Rs4, 10, "Expected 10 DSs connected to RS1, RS2" +
        " and RS4");
      assertEquals(getDSConnectedToRS(2), 10,
        "Wrong expected number of DSs connected to RS3");
      /**
       * Stop 2 DSs from RS3, one should end up with RS1 has 3 DSs, RS2 has 3
       * DSs, RS3 has 9 DSs and RS4 has 3 DSs (with DS (with the lowest server
       * id) from the RS that had 4 DSs that went to RS3)
       */
      // Determine the lowest id of DSs connected to the RS with 4 DSs
      Set<Integer> fourDsList = rs[rsWith4DsIndex].getDomainIterator().next().
        getConnectedDSs().keySet();
      assertEquals(fourDsList.size(), 4);
      int lowestDsId = Integer.MAX_VALUE;
      for (int id : fourDsList)
      {
        if (id < lowestDsId)
          lowestDsId = id;
      }
      // Get 2 DS ids of 2 DSs connected to RS3 and stop matching DSs
      Iterator<Integer> dsIdIt = rs[2].getDomainIterator().next().
        getConnectedDSs().keySet().iterator();
      int aFirstDsOnRs3Id = dsIdIt.next() - 1;
      rd[aFirstDsOnRs3Id].shutdown();
      int aSecondDsOnRs3Id = dsIdIt.next() - 1;
      rd[aSecondDsOnRs3Id].shutdown();
      // Check connections
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {3, 3, 9, 3}},
        "2 DSs ("+ aFirstDsOnRs3Id + "," + aSecondDsOnRs3Id +
        ") have been stopped from RS3, DS with lowest id (" + lowestDsId +
        ") should have moved from the RS with 4 DS (RS " +
        (rsWith4DsIndex+501) + ") to RS3");
      // Check that the right DS moved away from the RS with 4 DSs and went to
      // RS3 and that the 3 others did not move
      Set<Integer> dsOnRs3List = rs[2].getDomainIterator().next().
        getConnectedDSs().keySet();
      assertTrue(dsOnRs3List.contains(lowestDsId), "DS with the lowest id (" +
        lowestDsId + " should have come to RS3");
      Set<Integer> threeDsList = rs[rsWith4DsIndex].getDomainIterator().next().
        getConnectedDSs().keySet();
      assertEquals(threeDsList.size(), 3);
      for (int id : threeDsList)
      {
        assertTrue(fourDsList.contains(id), "DS " + id + " should still be on "
          + "RS " + (rsWith4DsIndex+501));
      }
      /**
       * Start the 2 stopped DSs again, we must end up with RS1, RS2 and RS4
       * each with 3 or 4 DSs (1 with 4 and the 2 others with 3) and RS3 with
       * 10 DSs
       */
      // Restart the 2 stopped DSs
      rd[aFirstDsOnRs3Id] = createReplicationDomain(aFirstDsOnRs3Id, testCase);
      assertTrue(rd[aFirstDsOnRs3Id].isConnected());
      rd[aSecondDsOnRs3Id] = createReplicationDomain(aSecondDsOnRs3Id, testCase);
      assertTrue(rd[aSecondDsOnRs3Id].isConnected());
      // Now check the number of connected DSs for each RS
      rs1ConnectedDSNumber = getDSConnectedToRS(0);
      assertTrue(((rs1ConnectedDSNumber == 3) || (rs1ConnectedDSNumber == 4)),
        "Wrong expected number of DSs connected to RS1: " +
        rs1ConnectedDSNumber);
      rs2ConnectedDSNumber = getDSConnectedToRS(1);
      assertTrue(((rs2ConnectedDSNumber == 3) || (rs2ConnectedDSNumber == 4)),
        "Wrong expected number of DSs connected to RS2: " +
        rs2ConnectedDSNumber);
      rs4ConnectedDSNumber = getDSConnectedToRS(3);
      assertTrue(((rs4ConnectedDSNumber == 3) || (rs4ConnectedDSNumber == 4)),
        "Wrong expected number of DSs connected to RS4: " +
        rs4ConnectedDSNumber);
      sumOfRs1Rs2Rs4 = rs1ConnectedDSNumber + rs2ConnectedDSNumber +
        rs4ConnectedDSNumber;
      assertEquals(sumOfRs1Rs2Rs4, 10, "Expected 10 DSs connected to RS1, RS2" +
        " and RS4");
      assertEquals(getDSConnectedToRS(2), 10,
        "Wrong expected number of DSs connected to RS3");
      /**
       * Change RS2 weight to 2, RS3 weight to 4, RS4 weight to 3, we must end
       * up with RS1 has 2 DSs, RS2 has 4 DSs, RS3 has 8 DSs and RS4 has 6 DSs
       */
      // Change RS2 weight to 2
      newRSConfig = createReplicationServerConfigWithNewWeight(1, 2, testCase);
      rs[1].applyConfigurationChange(newRSConfig);
      // Change RS3 weight to 4
      newRSConfig = createReplicationServerConfigWithNewWeight(2, 4, testCase);
      rs[2].applyConfigurationChange(newRSConfig);
      // Change RS4 weight to 3
      newRSConfig = createReplicationServerConfigWithNewWeight(3, 3, testCase);
      rs[3].applyConfigurationChange(newRSConfig);
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 4, 8, 6}},
        "Changed RS2, RS3 and RS4 weights");
      /**
       * Stop RS2 and RS4, we must end up with RS1 has 4 DSs, and RS3 has 16 DSs
       */
      // Stop RS2
      rs[1].clearDb();
      rs[1].remove();
      // Stop RS4
      rs[3].clearDb();
      rs[3].remove();
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {4, -1, 16, -1}},
        "Stopped RS2 and RS4");
      /**
       * Restart RS2 and RS4 with same weights (2 and 3), we must end up with
       * RS1 has 2 DSs, RS2 has 4 DSs, RS3 has 8 DSs and RS4 has 6 DSs
       */
      // Restart RS2
      rs[1] = createReplicationServer(1, 2, testCase);
      // Restart RS4
      rs[3] = createReplicationServer(3, 3, testCase);
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 4, 8, 6}},
        "Restarted RS2 and RS4");
      /**
       * Stop RS3, we must end up with RS1 has 3 DSs, and RS2 has 7 DSs and
       * RS4 has 10 DSs
       */
      // Stop RS3
      rs[2].clearDb();
      rs[2].remove();
      checkForCorrectNumbersOfConnectedDSs(new int[][]{
        new int[] {2, 8, -1, 10},
        new int[] {3, 7, -1, 10},
        new int[] {3, 8, -1, 9},
        new int[] {4, 6, -1, 10},
        new int[] {4, 7, -1, 9},
        new int[] {5, 6, -1, 9}},
        "Stopped RS3");
      /**
       * Restart RS3 with same weight (4), we must end up with RS1 has 2 DSs,
       * RS2 has 4 DSs, RS3 has 8 DSs and RS4 has 6 DSs
       */
      // Restart RS3
      rs[2] = createReplicationServer(2, 4, testCase);
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 4, 8, 6}},
        "Restarted RS2 and RS4");
      /**
       * Stop RS1, RS2 and RS3, all DSs should be connected to RS4
       */
      // Stop RS1
      rs[0].clearDb();
      rs[0].remove();
      // Stop RS2
      rs[1].clearDb();
      rs[1].remove();
      // Stop RS3
      rs[2].clearDb();
      rs[2].remove();
      checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {-1, -1, -1, 20}},
        "Stopped RS1, RS2 and RS3");
    } finally
    {
      endTest();
    }
  }
  // Translate an int array into a human readable string
  private static String intArrayToString(int[] ints)
  {
    StringBuffer sb = new StringBuffer("[");
    for (int i = 0; i < ints.length; i++)
    {
      if (i != 0)
        sb.append(",");
      sb.append(ints[i]);
    }
    sb.append("]");
    return sb.toString();
  }
  // Translate an int[][] array into a human readable string
  private static String intArrayToString(int[][] ints)
  {
    StringBuffer sb = new StringBuffer("[");
    for (int i = 0; i < ints.length; i++)
    {
      if (i != 0)
        sb.append(",");
      sb.append(intArrayToString(ints[i]));
    }
    sb.append("]");
    return sb.toString();
  }
  /**
   * Wait for the correct number of connected DSs for each RS. Fails if timeout
   * before condition met.
   * @param possibleExpectedDSsNumbers The expected number of connected DSs for each
   * RS. -1 if the matching RS should not be taken into account. This is a list of
   * possible expected situation
   * @param msg The message to display if the condition is not met before
   *        timeout
   */
  private void checkForCorrectNumbersOfConnectedDSs(int[][] possibleExpectedDSsNumbers,
    String msg)
  {
    // Time to wait before condition met: warning, this should let enough
    // time to the topology to auto-balance. Currently  this must at least let
    // enough time to a topo message being received and to monitoring messages
    // being received after (2 monitoring publisher period)
    int secTimeout = 30;
    int nSec = 0;
    // To display what has been seen
    int[] finalDSsNumbers = new int[possibleExpectedDSsNumbers[0].length];
    // Go out of the loop only if connection is verified or if timeout occurs
    while (true)
    {
      for (int i = 0; i < possibleExpectedDSsNumbers.length; i++)
      {
        // Examine next possible final situation
        int[] expectedDSsNumbers = possibleExpectedDSsNumbers[i];
        // Examine connections
        int nOk = 0; // Number of RSs ok
        int nRSs = 0; // Number of RSs to examine
        for (int j = 0; j < finalDSsNumbers.length; j++)
        {
          int expectedDSNumber = expectedDSsNumbers[j];
          if (expectedDSNumber != -1)
          {
            nRSs++;
            // Check for number of DSs connected to this RS
            int connectedDSs = getDSConnectedToRS(j);
            if (connectedDSs == expectedDSNumber)
            {
              nOk++;
            }
            // Store result for this RS
            finalDSsNumbers[j] = connectedDSs;
          }
          else
          {
            // Store result for this RS
            finalDSsNumbers[j] = -1;
          }
        }
        if (nOk == nRSs)
        {
          // Connection verified
          debugInfo("checkForCorrectNumbersOfConnectedDSs: got expected " +
            "connections " + intArrayToString(expectedDSsNumbers) + " after " + nSec +
            " seconds.");
          return;
        }
      }
      // Sleep 1 second
      try
      {
        Thread.sleep(1000);
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      nSec++;
      if (nSec > secTimeout)
      {
        // Timeout reached, end with error
        fail("checkForCorrectNumbersOfConnectedDSs: could not get expected " +
          "connections " + intArrayToString(possibleExpectedDSsNumbers) + " after " + (nSec-1) +
          " seconds. Got this result : " + intArrayToString(finalDSsNumbers) +
          " [" + msg + "]");
      }
    }
  }
  /**
   * In a topology where the balance cannot be exactly reached according to the
   * weights, this is testing that the DS is not doing yoyo. The yoyo effect
   * would be a DS keeping going between RSs (going to/back from other RS for
   * ever).
   *
   * RS1 weight=1 ;  RS2 weight=1 ; 3DSs.
   * We expect two DSs on one RS and the last one on the other RS and no
   * disconnections/reconnections after the very first connections.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  public void testNoYoyo1() throws Exception
  {
    String testCase = "testNoYoyo1";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /**
       * RS1 (weight=1) starts
       */
      rs[0] = createReplicationServer(0, 1, testCase);
      /**
       * DS1 starts and connects to RS1
       */
      rd[0] = createReplicationDomain(0, testCase);
      assertTrue(rd[0].isConnected());
      assertEquals(rd[0].getRsServerId(), RS1_ID);
      /**
       * RS2 (weight=1) starts
       */
      rs[1] = createReplicationServer(1, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1},
        "Waiting for RS2 connected to peers");
      /**
       * DS2 starts and connects to RS2
       */
      rd[1] = createReplicationDomain(1, testCase);
      assertTrue(rd[1].isConnected());
      assertEquals(rd[1].getRsServerId(), RS2_ID);
      /**
       * DS3 starts and connects to either RS1 or RS2 but should stay on it
       */
      int dsIsIndex = 2;
      rd[dsIsIndex] = createReplicationDomain(dsIsIndex, testCase);
      assertTrue(rd[dsIsIndex].isConnected());
      int rsId = rd[dsIsIndex].getRsServerId();
      int rsIndex = rsId - 501;
      int nDSs = getDSConnectedToRS(rsIndex);
      assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
          rsId);
      debugInfo(testCase + ": DS3 connected to RS " + rsId + ", with " + nDSs
        + " DSs");
      // Be sure that DS3 stays connected to the same RS during some long time
      // check every second
      int waitTime = 10;
      int elapsedTime = 0;
      while (elapsedTime < waitTime)
      {
        Thread.sleep(1000);
        // Still connected to the right RS ?
        assertEquals(rd[dsIsIndex].getRsServerId(), rsId, "DS3 should still be " +
          "connected to RS " + rsId);
        assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
          rsId);
        elapsedTime++;
      }
    } finally
    {
      endTest();
    }
  }
  /**
   * In a topology where the balance cannot be exactly reached according to the
   * weights, this is testing that the DS is not doing yoyo. The yoyo effect
   * would be a DS keeping going between RSs (going to/back from other RS for
   * ever).
   *
   * RS1 weight=1 ;  RS2 weight=1 ; RS3 weight=1 ; 4DSs.
   * We expect 1 RS with 2 DSs and the 2 other RSs with 1 DS each and no
   * disconnections/reconnections after the very first connections.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  public void testNoYoyo2() throws Exception
  {
    String testCase = "testNoYoyo2";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /**
       * RS1 (weight=1) starts
       */
      rs[0] = createReplicationServer(0, 1, testCase);
      /**
       * DS1 starts and connects to RS1
       */
      rd[0] = createReplicationDomain(0, testCase);
      assertTrue(rd[0].isConnected());
      assertEquals(rd[0].getRsServerId(), RS1_ID);
      /**
       * RS2 (weight=1) and R3 (weight=1) start
       */
      rs[1] = createReplicationServer(1, 1, testCase);
      rs[2] = createReplicationServer(2, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1, 2},
        "Waiting for RSs being connected to peers");
      /**
       * DS2 to DS3 start and connects to RSs
       */
      for (int i = 1; i < 3; i++)
      {
        rd[i] = createReplicationDomain(i, testCase);
        assertTrue(rd[i].isConnected());
      }
      /**
       * DS4 starts and connects to either RS1 RS2 or RS3 but should stay on it
       */
      int dsIsIndex = 3;
      rd[dsIsIndex] = createReplicationDomain(dsIsIndex, testCase);
      assertTrue(rd[dsIsIndex].isConnected());
      int rsId = rd[dsIsIndex].getRsServerId();
      int rsIndex = rsId - 501;
      int nDSs = getDSConnectedToRS(rsIndex);
      assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
          rsId);
      debugInfo(testCase + ": DS4 connected to RS " + rsId + ", with " + nDSs
        + " DSs");
      // Be sure that DS3 stays connected to the same RS during some long time
      // check every second
      int waitTime = 10;
      int elapsedTime = 0;
      while (elapsedTime < waitTime)
      {
        Thread.sleep(1000);
        // Still connected to the right RS ?
        assertEquals(rd[dsIsIndex].getRsServerId(), rsId, "DS4 should still be " +
          "connected to RS " + rsId);
        assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
          rsId);
        elapsedTime++;
      }
    } finally
    {
      endTest();
    }
  }
  /**
   * In a topology where the balance cannot be exactly reached according to the
   * weights, this is testing that the DS is not doing yoyo. The yoyo effect
   * would be a DS keeping going between RSs (going to/back from other RS for
   * ever).
   *
   * RS1 weight=1 ;  RS2 weight=1 ; RS3 weight=1 ; 7DSs.
   * We expect 1 RS with 3 DSs and the 2 other RSs with 2 DS each and no
   * disconnections/reconnections after the very first connections.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  public void testNoYoyo3() throws Exception
  {
    String testCase = "testNoYoyo3";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /**
       * RS1 (weight=1) starts
       */
      rs[0] = createReplicationServer(0, 1, testCase);
      /**
       * DS1 starts and connects to RS1
       */
      rd[0] = createReplicationDomain(0, testCase);
      assertTrue(rd[0].isConnected());
      assertEquals(rd[0].getRsServerId(), RS1_ID);
      /**
       * RS2 (weight=1) and R3 (weight=1) start
       */
      rs[1] = createReplicationServer(1, 1, testCase);
      rs[2] = createReplicationServer(2, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1, 2},
        "Waiting for RSs being connected to peers");
      /**
       * DS2 to DS6 start and connects to RSs
       */
      for (int i = 1; i < 6; i++)
      {
        rd[i] = createReplicationDomain(i, testCase);
        assertTrue(rd[i].isConnected());
      }
      /**
       * DS7 starts and connects to either RS1 RS2 or RS3 but should stay on it
       */
      int dsIsIndex = 6;
      rd[dsIsIndex] = createReplicationDomain(dsIsIndex, testCase);
      assertTrue(rd[dsIsIndex].isConnected());
      int rsId = rd[dsIsIndex].getRsServerId();
      int rsIndex = rsId - 501;
      int nDSs = getDSConnectedToRS(rsIndex);
      assertEquals(getDSConnectedToRS(rsIndex), 3, " Expected 2 DSs on RS " +
          rsId);
      debugInfo(testCase + ": DS7 connected to RS " + rsId + ", with " + nDSs
        + " DSs");
      // Be sure that DS3 stays connected to the same RS during some long time
      // check every second
      int waitTime = 10;
      int elapsedTime = 0;
      while (elapsedTime < waitTime)
      {
        Thread.sleep(1000);
        // Still connected to the right RS ?
        assertEquals(rd[dsIsIndex].getRsServerId(), rsId, "DS7 should still be " +
          "connected to RS " + rsId);
        assertEquals(getDSConnectedToRS(rsIndex), 3, " Expected 2 DSs on RS " +
          rsId);
        elapsedTime++;
      }
    } finally
    {
      endTest();
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -22,10 +22,13 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -37,6 +40,7 @@
import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashSet;
@@ -821,22 +825,34 @@
  private RSInfo createRSInfo(int rsId)
  {
    int groupId = -1;
    String serverUrl = null;
    String localHostname = null;
    try
    {
      localHostname = InetAddress.getLocalHost().getHostName();
    } catch (UnknownHostException ex)
    {
      fail("Could not get local host name: " + ex.getMessage());
    }
    switch (rsId)
    {
      case RS1_ID:
        groupId = RS1_GID;
        serverUrl = localHostname + ":" + rs1Port;
        break;
      case RS2_ID:
        groupId = RS2_GID;
        serverUrl = localHostname + ":" + rs2Port;
        break;
      case RS3_ID:
        groupId = RS3_GID;
        serverUrl = localHostname + ":" + rs3Port;
        break;
      default:
        fail("Unknown replication server id.");
    }
    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
    return new RSInfo(rsId, serverUrl, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -1092,11 +1092,11 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
    RSInfo rsInfo1 = new RSInfo(4527, null, (long)45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
    RSInfo rsInfo2 = new RSInfo(4527, null, (long)0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
    RSInfo rsInfo3 = new RSInfo(0, null, (long)-21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -1026,13 +1026,13 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
    RSInfo rsInfo1 = new RSInfo(4527, "rsHost1:123", (long)45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
    RSInfo rsInfo2 = new RSInfo(4527, "rsHost2:456", (long)0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
    RSInfo rsInfo3 = new RSInfo(0, "rsHost3:789", (long)-21113, (byte)98, 1);
    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1);
    RSInfo rsInfo4 = new RSInfo(45678, "rsHost4:1011", (long)-21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -969,7 +969,7 @@
        }
        // Send our topo mesg
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
        RSInfo rsInfo = new RSInfo(serverId, fakeUrl, generationId, groupId, 1);
        List<RSInfo> rsInfos = new ArrayList<RSInfo>();
        rsInfos.add(rsInfo);
        TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);