mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.27.2013 4bb3d3af3d032a98f2ca318c81be5c4f81034b8f
ReplicationBroker.java:
In computeInitialServerStatus(), used early exits.
In computeBestServerForWeight(), extracted methods computeBestServerWhenNotConnected() and computeBestServerWhenConnected().
Changed replicationServerUrls from Collection<String> to Set<String>.
Removed useless field initialization to null.
Renamed _publish() to publish() + reduced local variables scope.
In receive(), renamed local variable replicationServerID to previousRsServerID and used this one more rather than the field.
In changeConfig(), used Set.equals().
Changed getReplicationMonitor() to getReplicationMonitorInstanceName().

ReplicationDomain.java:
Consequence of the change to ReplicationBroker.getReplicationMonitor(), inlined getReplicationMonitorInstanceName().
Consequence of the change to ReplicationBroker.replicationServerUrls.

TestCaseUtils.java:
Moved newSet(), newSortedSet(), newList() here from test classes.

*Test.java:
Consequence of the changes to ReplicationBroker.replicationServerUrls.
12 files modified
776 ■■■■ changed files
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 564 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 42 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java 16 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 13 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java 9 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 10 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 44 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 17 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 23 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 22 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,7 +31,10 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.*;
import java.net.ConnectException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +75,7 @@
  /**
   * Replication server URLs under this format: "<code>hostname:port</code>".
   */
  private volatile Collection<String> replicationServerUrls;
  private volatile Set<String> replicationServerUrls;
  private volatile boolean connected = false;
  /**
   * String reported under CSN=monitor when there is no connected RS.
@@ -98,9 +101,9 @@
  /** The server id of the RS we are connected to. */
  private Integer rsServerId = -1;
  /** The server URL of the RS we are connected to. */
  private String rsServerUrl = null;
  private String rsServerUrl;
  /** Our replication domain. */
  private ReplicationDomain domain = null;
  private ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
@@ -121,7 +124,7 @@
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  private HeartbeatMonitor heartbeatMonitor;
  /**
   * The number of times the connection was lost.
   */
@@ -140,7 +143,7 @@
   * The thread that publishes messages to the RS containing the current
   * change time of this DS.
   */
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
  /**
   * The expected period in milliseconds between these messages are sent
   * to the replication server. Zero means heartbeats are off.
@@ -149,8 +152,11 @@
  /*
   * Properties for the last topology info received from the network.
   */
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  /**
   * Info for other DSs.
   * <p>
   * Warning: does not contain info for us (for our server id)
   */
  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
  private volatile long generationID;
  private volatile int updateDoneCount = 0;
@@ -162,8 +168,7 @@
   * replication server one wants to connect. Key: replication server id Value:
   * replication server info for the matching replication server id
   */
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
    = null;
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
  /**
   * This integer defines when the best replication server checking algorithm
@@ -256,7 +261,7 @@
   *
   * @param replicationServers list of servers used
   */
  public void start(Collection<String> replicationServers)
  public void start(Set<String> replicationServers)
  {
    synchronized (startStopLock)
    {
@@ -1068,60 +1073,45 @@
    {
      // RS has no generation id
      return ServerStatus.NORMAL_STATUS;
    } else
    }
    else if (rsGenId != dsGenId)
    {
      if (rsGenId == dsGenId)
      // DS and RS do not have same generation id
      return ServerStatus.BAD_GEN_ID_STATUS;
    }
    else
    {
      /*
      DS and RS have same generation id
      Determine if we are late or not to replay changes. RS uses a
      threshold value for pending changes to be replayed by a DS to
      determine if the DS is in normal status or in degraded status.
      Let's compare the local and remote server state using  this threshold
      value to determine if we are late or not
      */
      int nChanges = ServerState.diffChanges(rsState, state);
      if (debugEnabled())
      {
        /*
        DS and RS have same generation id
        Determine if we are late or not to replay changes. RS uses a
        threshold value for pending changes to be replayed by a DS to
        determine if the DS is in normal status or in degraded status.
        Let's compare the local and remote server state using  this threshold
        value to determine if we are late or not
        */
        ServerStatus initStatus;
        int nChanges = ServerState.diffChanges(rsState, state);
        if (debugEnabled())
        {
          TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
              + serverId + " computed " + nChanges + " changes late.");
        }
        /*
        Check status to know if it is relevant to change the status. Do not
        take RSD lock to test. If we attempt to change the status whereas
        we are in a status that do not allows that, this will be noticed by
        the changeStatusFromStatusAnalyzer method. This allows to take the
        lock roughly only when needed versus every sleep time timeout.
        */
        if (degradedStatusThreshold > 0)
        {
          if (nChanges >= degradedStatusThreshold)
          {
            initStatus = ServerStatus.DEGRADED_STATUS;
          } else
          {
            initStatus = ServerStatus.NORMAL_STATUS;
          }
        } else
        {
          /*
          0 threshold value means no degrading system used (no threshold):
          force normal status
          */
          initStatus = ServerStatus.NORMAL_STATUS;
        }
        return initStatus;
      } else
      {
        // DS and RS do not have same generation id
        return ServerStatus.BAD_GEN_ID_STATUS;
        TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
            + serverId + " computed " + nChanges + " changes late.");
      }
      /*
      Check status to know if it is relevant to change the status. Do not
      take RSD lock to test. If we attempt to change the status whereas
      we are in a status that do not allows that, this will be noticed by
      the changeStatusFromStatusAnalyzer method. This allows to take the
      lock roughly only when needed versus every sleep time timeout.
      */
      if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
      {
        return ServerStatus.DEGRADED_STATUS;
      }
      // degradedStatusThreshold value of '0' means no degrading system used
      // (no threshold): force normal status
      return ServerStatus.NORMAL_STATUS;
    }
  }
@@ -1472,20 +1462,16 @@
      return bestServers.values().iterator().next();
    }
      if (firstConnection)
      {
        // We are not connected to a server yet
        return computeBestServerForWeight(bestServers, -1, -1);
      } else
      {
        /*
        We are already connected to a RS: compute the best RS as far as the
        weights is concerned. If this is another one, some DS must
        disconnect.
        */
        return computeBestServerForWeight(bestServers, rsServerId,
          localServerId);
      }
    if (firstConnection)
    {
      // We are not connected to a server yet
      return computeBestServerForWeight(bestServers, -1, -1);
    }
    /*
     * We are already connected to a RS: compute the best RS as far as the
     * weights is concerned. If this is another one, some DS must disconnect.
     */
    return computeBestServerForWeight(bestServers, rsServerId, localServerId);
  }
  /**
@@ -1783,11 +1769,12 @@
      sumOfWeights += replicationServerInfo.getWeight();
      sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
    }
    // Distance (difference) of the current loads to the load goals of each RS:
    // key:server id, value: distance
    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
    // Precision for the operations (number of digits after the dot)
    MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
@@ -1812,199 +1799,210 @@
    if (currentRsServerId == -1)
    {
      // The local server is not connected yet
      // The local server is not connected yet, find best server to connect to,
      // taking the weights into account.
      return computeBestServerWhenNotConnected(bestServers, loadDistances);
    }
    // The local server is currently connected to a RS, let's see if it must
    // disconnect or not, taking the weights into account.
    return computeBestServerWhenConnected(bestServers, loadDistances,
        localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
  }
  private static ReplicationServerInfo computeBestServerWhenNotConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
      Map<Integer, BigDecimal> loadDistances)
  {
    /*
     * Find the server with the current highest distance to its load goal and
     * choose it. Make an exception if every server is correctly balanced,
     * that is every current load distances are equal to 0, in that case,
     * choose the server with the highest weight
     */
    int bestRsId = 0; // If all server equal, return the first one
    float highestDistance = Float.NEGATIVE_INFINITY;
    boolean allRsWithZeroDistance = true;
    int highestWeightRsId = -1;
    int highestWeight = -1;
    for (Integer rsId : bestServers.keySet())
    {
      float loadDistance = loadDistances.get(rsId).floatValue();
      if (loadDistance > highestDistance)
      {
        // This server is far more from its balance point
        bestRsId = rsId;
        highestDistance = loadDistance;
      }
      if (loadDistance != 0)
      {
        allRsWithZeroDistance = false;
      }
      int weight = bestServers.get(rsId).getWeight();
      if (weight > highestWeight)
      {
        // This server has a higher weight
        highestWeightRsId = rsId;
        highestWeight = weight;
      }
    }
    // All servers with a 0 distance ?
    if (allRsWithZeroDistance)
    {
      // Choose server with the highest weight
      bestRsId = highestWeightRsId;
    }
    return bestServers.get(bestRsId);
  }
  private static ReplicationServerInfo computeBestServerWhenConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
      Map<Integer, BigDecimal> loadDistances, int localServerId,
      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
  {
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    float currentLoadDistance =
      loadDistances.get(currentRsServerId).floatValue();
    if (currentLoadDistance < 0)
    {
      /*
       * Find the server with the current highest distance to its load goal and
       * choose it. Make an exception if every server is correctly balanced,
       * that is every current load distances are equal to 0, in that case,
       * choose the server with the highest weight
       */
      int bestRsId = 0; // If all server equal, return the first one
      float highestDistance = Float.NEGATIVE_INFINITY;
      boolean allRsWithZeroDistance = true;
      int highestWeightRsId = -1;
      int highestWeight = -1;
      Too much DSs connected to the current RS, compared with its load
      goal:
      Determine the potential number of DSs to disconnect from the current
      RS and see if the local DS is part of them: the DSs that must
      disconnect are those with the lowest server id.
      Compute the sum of the distances of the load goals of the other RSs
      */
      BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
      for (Integer rsId : bestServers.keySet())
      {
        float loadDistance = loadDistances.get(rsId).floatValue();
        if (loadDistance > highestDistance)
        if (rsId != currentRsServerId)
        {
          // This server is far more from its balance point
          bestRsId = rsId;
          highestDistance = loadDistance;
        }
        if (loadDistance != 0)
        {
          allRsWithZeroDistance = false;
        }
        int weight = bestServers.get(rsId).getWeight();
        if (weight > highestWeight)
        {
          // This server has a higher weight
          highestWeightRsId = rsId;
          highestWeight = weight;
          sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
            loadDistances.get(rsId), mathContext);
        }
      }
      // All servers with a 0 distance ?
      if (allRsWithZeroDistance)
      {
        // Choose server with the highest weight
        bestRsId = highestWeightRsId;
      }
      return bestServers.get(bestRsId);
    } else
    {
      // The local server is currently connected to a RS, let's see if it must
      // disconnect or not, taking the weights into account.
      float currentLoadDistance =
        loadDistances.get(currentRsServerId).floatValue();
      if (currentLoadDistance < 0)
      if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
      {
        /*
        Too much DSs connected to the current RS, compared with its load
        goal:
        Determine the potential number of DSs to disconnect from the current
        RS and see if the local DS is part of them: the DSs that must
        disconnect are those with the lowest server id.
        Compute the sum of the distances of the load goals of the other RSs
        The average distance of the other RSs shows a lack of DSs.
        Compute the number of DSs to disconnect from the current RS,
        rounding to the nearest integer number. Do only this if there is
        no risk of yoyo effect: when the exact balance cannot be
        established due to the current number of DSs connected, do not
        disconnect a DS. A simple example where the balance cannot be
        reached is:
        - RS1 has weight 1 and 2 DSs
        - RS2 has weight 1 and 1 DS
        => disconnecting a DS from RS1 to reconnect it to RS2 would have no
        sense as this would lead to the reverse situation. In that case,
        the perfect balance cannot be reached and we must stick to the
        current situation, otherwise the DS would keep move between the 2
        RSs
        */
        BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
        for (Integer rsId : bestServers.keySet())
        float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
          multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
              .floatValue();
        int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
        // Avoid yoyo effect
        if (overloadingDSsNumber == 1)
        {
          if (rsId != currentRsServerId)
          {
            sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
              loadDistances.get(rsId), mathContext);
          }
        }
        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
        {
          /*
          The average distance of the other RSs shows a lack of DSs.
          Compute the number of DSs to disconnect from the current RS,
          rounding to the nearest integer number. Do only this if there is
          no risk of yoyo effect: when the exact balance cannot be
          established due to the current number of DSs connected, do not
          disconnect a DS. A simple example where the balance cannot be
          reached is:
          - RS1 has weight 1 and 2 DSs
          - RS2 has weight 1 and 1 DS
          => disconnecting a DS from RS1 to reconnect it to RS2 would have no
          sense as this would lead to the reverse situation. In that case,
          the perfect balance cannot be reached and we must stick to the
          current situation, otherwise the DS would keep move between the 2
          RSs
          */
          float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
            multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
                .floatValue();
          int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
          // Avoid yoyo effect
          if (overloadingDSsNumber == 1)
          {
            // What would be the new load distance for the current RS if
            // we disconnect some DSs ?
            ReplicationServerInfo currentReplicationServerInfo =
              bestServers.get(currentRsServerId);
            int currentRsWeight = currentReplicationServerInfo.getWeight();
            BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
            BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
            BigDecimal currentRsLoadGoalBd =
              currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
            BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
            if (sumOfConnectedDSs != 0)
            {
              int connectedDSs = currentReplicationServerInfo.
                getConnectedDSNumber();
              BigDecimal potentialNewConnectedDSsBd =
                  BigDecimal.valueOf(connectedDSs - 1);
              BigDecimal sumOfConnectedDSsBd =
                  BigDecimal.valueOf(sumOfConnectedDSs);
              potentialCurrentRsNewLoadBd =
                potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
                  mathContext);
            }
            BigDecimal potentialCurrentRsNewLoadDistanceBd =
              currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
                mathContext);
            // What would be the new load distance for the other RSs ?
            BigDecimal additionalDsLoadBd =
                BigDecimal.ONE.divide(
                    BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
              sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
                    mathContext);
            /*
            Now compare both values: we must no disconnect the DS if this
            is for going in a situation where the load distance of the other
            RSs is the opposite of the future load distance of the local RS
            or we would evaluate that we should disconnect just after being
            arrived on the new RS. But we should disconnect if we reach the
            perfect balance (both values are 0).
            */
            MathContext roundMc =
              new MathContext(6, RoundingMode.DOWN);
            BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
              potentialCurrentRsNewLoadDistanceBd.round(roundMc);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
              potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
            if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
              BigDecimal.ZERO) != 0)
              && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
              potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
            {
              // Avoid the yoyo effect, and keep the local DS connected to its
              // current RS
              return bestServers.get(currentRsServerId);
            }
          }
          // Prepare a sorted list (from lowest to highest) or DS server ids
          // connected to the current RS
          ReplicationServerInfo currentRsInfo =
          // What would be the new load distance for the current RS if
          // we disconnect some DSs ?
          ReplicationServerInfo currentReplicationServerInfo =
            bestServers.get(currentRsServerId);
          List<Integer> serversConnectedToCurrentRS =
            currentRsInfo.getConnectedDSs();
          List<Integer> sortedServers = new ArrayList<Integer>(
            serversConnectedToCurrentRS);
          Collections.sort(sortedServers);
          // Go through the list of DSs to disconnect and see if the local
          // server is part of them.
          int index = 0;
          while (overloadingDSsNumber > 0)
          int currentRsWeight = currentReplicationServerInfo.getWeight();
          BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
          BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
          BigDecimal currentRsLoadGoalBd =
            currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
          BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
          if (sumOfConnectedDSs != 0)
          {
            int severToDisconnectId = sortedServers.get(index);
            if (severToDisconnectId == localServerId)
            {
              // The local server is part of the DSs to disconnect
              return null;
            }
            overloadingDSsNumber--;
            index++;
            int connectedDSs = currentReplicationServerInfo.
              getConnectedDSNumber();
            BigDecimal potentialNewConnectedDSsBd =
                BigDecimal.valueOf(connectedDSs - 1);
            BigDecimal sumOfConnectedDSsBd =
                BigDecimal.valueOf(sumOfConnectedDSs);
            potentialCurrentRsNewLoadBd =
              potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
                mathContext);
          }
          BigDecimal potentialCurrentRsNewLoadDistanceBd =
            currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
              mathContext);
          // The local server is not part of the servers to disconnect from the
          // current RS.
        } else {
          // The average distance of the other RSs does not show a lack of DSs:
          // no need to disconnect any DS from the current RS.
          // What would be the new load distance for the other RSs ?
          BigDecimal additionalDsLoadBd =
              BigDecimal.ONE.divide(
                  BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
            sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
                  mathContext);
          /*
          Now compare both values: we must no disconnect the DS if this
          is for going in a situation where the load distance of the other
          RSs is the opposite of the future load distance of the local RS
          or we would evaluate that we should disconnect just after being
          arrived on the new RS. But we should disconnect if we reach the
          perfect balance (both values are 0).
          */
          MathContext roundMc = new MathContext(6, RoundingMode.DOWN);
          BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
            potentialCurrentRsNewLoadDistanceBd.round(roundMc);
          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
            potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
          if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
            BigDecimal.ZERO) != 0)
            && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
            potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
          {
            // Avoid the yoyo effect, and keep the local DS connected to its
            // current RS
            return bestServers.get(currentRsServerId);
          }
        }
        // Prepare a sorted list (from lowest to highest) or DS server ids
        // connected to the current RS
        ReplicationServerInfo currentRsInfo =
          bestServers.get(currentRsServerId);
        List<Integer> serversConnectedToCurrentRS =
            new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
        Collections.sort(serversConnectedToCurrentRS);
        // Go through the list of DSs to disconnect and see if the local
        // server is part of them.
        int index = 0;
        while (overloadingDSsNumber > 0)
        {
          int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
          if (serverIdToDisconnect == localServerId)
          {
            // The local server is part of the DSs to disconnect
            return null;
          }
          overloadingDSsNumber--;
          index++;
        }
        // The local server is not part of the servers to disconnect from the
        // current RS.
      } else {
        // The RS load goal is reached or there are not enough DSs connected to
        // it to reach it: do not disconnect from this RS and return rsInfo for
        // this RS
        // The average distance of the other RSs does not show a lack of DSs:
        // no need to disconnect any DS from the current RS.
      }
      return bestServers.get(currentRsServerId);
    } else {
      // The RS load goal is reached or there are not enough DSs connected to
      // it to reach it: do not disconnect from this RS and return rsInfo for
      // this RS
    }
    return bestServers.get(currentRsServerId);
  }
  /**
@@ -2117,7 +2115,7 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false, true);
    publish(msg, false, true);
  }
  /**
@@ -2128,7 +2126,7 @@
   */
  public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
  {
    return _publish(msg, false, retryOnFailure);
    return publish(msg, false, retryOnFailure);
  }
  /**
@@ -2137,7 +2135,7 @@
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true, true);
    publish(msg, true, true);
  }
  /**
@@ -2147,7 +2145,7 @@
   * @param retryOnFailure whether retry should be done on failure
   * @return whether the message was successfully sent.
   */
  boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
  private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
      boolean retryOnFailure)
  {
    boolean done = false;
@@ -2175,10 +2173,6 @@
      try
      {
        boolean credit;
        Session current_session;
        Semaphore currentWindowSemaphore;
        /*
        save the session at the time when we acquire the
        sendwindow credit so that we can make sure later
@@ -2187,9 +2181,11 @@
        on a session with a credit that was acquired from a previous
        session.
        */
        Session currentSession;
        Semaphore currentWindowSemaphore;
        synchronized (connectPhaseLock)
        {
          current_session = session;
          currentSession = session;
          currentWindowSemaphore = sendWindow;
        }
@@ -2204,6 +2200,7 @@
          return false;
        }
        boolean credit;
        if (msg instanceof UpdateMsg)
        {
          /*
@@ -2217,6 +2214,7 @@
        {
          credit = true;
        }
        if (credit)
        {
          synchronized (connectPhaseLock)
@@ -2228,8 +2226,7 @@
            reconnection happened and we need to restart from scratch.
            */
            if ((session != null) &&
                (session == current_session))
            if (session != null && session == currentSession)
            {
              session.publish(msg);
              done = true;
@@ -2340,7 +2337,7 @@
        break;
      }
      final int replicationServerID = rsServerId;
      final int previousRsServerID = rsServerId;
      try
      {
        ReplicationMsg msg = savedSession.receive();
@@ -2375,7 +2372,7 @@
        {
          // RS performs a proper disconnection
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
              replicationServerID, savedSession.getReadableRemoteAddress(),
              previousRsServerID, savedSession.getReadableRemoteAddress(),
              serverId, baseDN.toNormalizedString());
          logError(message);
@@ -2425,13 +2422,12 @@
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              ReplicationServerInfo bestServerInfo =
                computeBestReplicationServer(false, rsServerId, state,
                replicationServerInfos, serverId, groupId,
                generationID);
              if ((rsServerId != -1) && ((bestServerInfo == null) ||
                (bestServerInfo.getServerId() != rsServerId)))
              final ReplicationServerInfo bestServerInfo =
                  computeBestReplicationServer(false, previousRsServerID, state,
                      replicationServerInfos, serverId, groupId, generationID);
              if (previousRsServerID != -1
                  && (bestServerInfo == null
                      || bestServerInfo.getServerId() != previousRsServerID))
              {
                // The best replication server is no more the one we are
                // currently using. Disconnect properly then reconnect.
@@ -2439,14 +2435,14 @@
                if (bestServerInfo == null)
                {
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      baseDN.toNormalizedString());
                }
                else
                {
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
@@ -2483,7 +2479,7 @@
          {
            // We did not initiate the close on our side, log an error message.
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                serverId, baseDN.toNormalizedString(), replicationServerID,
                serverId, baseDN.toNormalizedString(), previousRsServerID,
                savedSession.getReadableRemoteAddress());
            logError(message);
          }
@@ -2684,9 +2680,8 @@
   *                            requires to restart the service.
   * @param groupId            The new group id to use
   */
  public boolean changeConfig(
    Collection<String> replicationServers, int window, long heartbeatInterval,
    byte groupId)
  public boolean changeConfig(Set<String> replicationServers, int window,
      long heartbeatInterval, byte groupId)
  {
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
@@ -2695,8 +2690,7 @@
    // the connection is modified
    boolean needToRestartSession =
        this.replicationServerUrls == null
        || replicationServers.size() != this.replicationServerUrls.size()
        || !replicationServers.containsAll(this.replicationServerUrls)
        || !replicationServers.equals(this.replicationServerUrls)
        || window != this.maxRcvWindow
        || heartbeatInterval != this.heartbeatInterval
        || groupId != this.groupId;
@@ -2788,12 +2782,10 @@
   */
  public List<RSInfo> getRsList()
  {
    List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo replicationServerInfo :
      replicationServerInfos.values())
    final List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
    {
      result.add(replicationServerInfo.toRSInfo());
      result.add(rsInfo.toRSInfo());
    }
    return result;
  }
@@ -2989,14 +2981,14 @@
  }
  /**
   * Returns the replication monitor associated with this broker.
   * Returns the replication monitor instance name associated with this broker.
   *
   * @return The replication monitor.
   * @return The replication monitor instance name.
   */
  ReplicationMonitor getReplicationMonitor()
  String getReplicationMonitorInstanceName()
  {
    // Only invoked by replication domain so always non-null.
    return monitor;
    return monitor.getMonitorInstanceName();
  }
  private void setSession(final Session newSession)
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1478,7 +1478,7 @@
      }
      if (debugEnabled())
        TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName()
        TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName()
            + " export ends with " + " connected=" + broker.isConnected()
            + " exportRootException=" + exportRootException);
@@ -1585,11 +1585,6 @@
    }
  }
  private String getReplicationMonitorInstanceName()
  {
    return broker.getReplicationMonitor().getMonitorInstanceName();
  }
  /**
   * For all remote servers in the start list:
   * - wait it has finished the import and present the expected generationID,
@@ -1835,7 +1830,8 @@
        msg = broker.receive(false, false, true);
        if (debugEnabled())
          TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName()
          TRACER.debugInfo("[IE] In "
              + broker.getReplicationMonitorInstanceName()
              + ", receiveEntryBytes " + msg);
        if (msg == null)
@@ -1888,7 +1884,7 @@
              broker.publish(amsg, false);
              if (debugEnabled())
                TRACER.debugInfo("[IE] In "
                    + getReplicationMonitorInstanceName()
                    + broker.getReplicationMonitorInstanceName()
                    + ", publish InitializeRcvAckMsg" + amsg);
            }
          }
@@ -2072,13 +2068,12 @@
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
          + Arrays.toString(lDIFEntry));
    // publish the message
    boolean sent = broker.publish(entryMessage, false);
    // process any publish error
    if (((!sent)||
        (broker.hasConnectionError()))||
        (broker.getNumLostConnections() != ieContext.initNumLostConnections))
    if (!sent
        || broker.hasConnectionError()
        || broker.getNumLostConnections() != ieContext.initNumLostConnections)
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2125,8 +2120,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeFromRemote(int source)
  throws DirectoryException
  public void initializeFromRemote(int source) throws DirectoryException
  {
    initializeFromRemote(source, null);
  }
@@ -2966,8 +2960,7 @@
   * @throws ConfigException     If the DirectoryServer configuration was
   *                             incorrect.
   */
  public void startPublishService(
      Collection<String> replicationServers, int window,
  public void startPublishService(Set<String> replicationServers, int window,
      long heartbeatInterval, long changetimeHeartbeatInterval)
  throws ConfigException
  {
@@ -3078,18 +3071,15 @@
  /**
   * Change some ReplicationDomain parameters.
   *
   * @param replicationServers  The new list of Replication Servers that this
   * @param replicationServers  The new set of Replication Servers that this
   *                           domain should now use.
   * @param windowSize         The window size that this domain should use.
   * @param heartbeatInterval  The heartbeatInterval that this domain should
   *                           use.
   * @param groupId            The new group id to use
   */
  public void changeConfig(
      Collection<String> replicationServers,
      int windowSize,
      long heartbeatInterval,
      byte groupId)
  public void changeConfig(Set<String> replicationServers, int windowSize,
      long heartbeatInterval, byte groupId)
  {
    this.groupId = groupId;
@@ -3576,15 +3566,13 @@
      Set<String> s2 = new HashSet<String>(s1);
      s2.addAll(includeAttributesForDeletes);
      Set<String> s = eclIncludesByServer.get(serverId);
      if (!s1.equals(s))
      if (!s1.equals(eclIncludesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      }
      s = eclIncludesForDeletesByServer.get(serverId);
      if (!s2.equals(s))
      if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer.put(serverId,
@@ -3592,7 +3580,7 @@
      }
      // and rebuild the global list to be ready for usage
      s = new HashSet<String>();
      Set<String> s = new HashSet<String>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        s.addAll(attributes);
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1922,4 +1922,20 @@
    dsconfig("set-backend-prop", "--backend-name", backendID,
             "--set", "enabled:" + enabled);
  }
  public static <T> Set<T> newSet(T... elems)
  {
    return new HashSet<T>(Arrays.asList(elems));
  }
  public static <T> SortedSet<T> newSortedSet(T... elems)
  {
    return new TreeSet<T>(Arrays.asList(elems));
  }
  public static <T> List<T> newList(T... elems)
  {
    return new ArrayList<T>(Arrays.asList(elems));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -236,7 +236,7 @@
  private void connect(ReplicationBroker broker, int port, int timeout) throws Exception
  {
    broker.start(Collections.singletonList("localhost:" + port));
    broker.start(Collections.singleton("localhost:" + port));
    // give some time to the broker to connect to the replicationServer.
    checkConnection(30, broker, port);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -375,8 +375,7 @@
  private void createFakeReplicationDomain(boolean firstBackend,
      long generationId) throws Exception
  {
    List<String> replicationServers = new ArrayList<String>();
    replicationServers.add("localhost:" + replServerPort);
    Set<String> replicationServers = newSet("localhost:" + replServerPort);
    DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
    replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId);
@@ -566,13 +565,9 @@
    private int exportedEntryCount;
    private long generationID = -1;
    public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
      long generationId) throws ConfigException
    public FakeReplicationDomain(DN baseDN, int serverID,
        Set<String> replicationServers, int window, long heartbeatInterval,
        long generationId) throws ConfigException
    {
      super(baseDN, serverID, 100);
      generationID = generationId;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,10 +29,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
@@ -229,9 +226,7 @@
    ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
    ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_,
        dsId, 100, generationId, 0, security, (byte) 1, 500);
    List<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + rs1Port);
    broker.start(servers);
    broker.start(Collections.singleton("localhost:" + rs1Port));
    checkConnection(30, broker, rs1Port);
    return broker;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -904,16 +904,6 @@
    };
  }
  private <T> Set<T> newSet(T... elems)
  {
    return new HashSet<T>(Arrays.asList(elems));
  }
  private <T> List<T> newList(T... elems)
  {
    return Arrays.asList(elems);
  }
  /**
   * Test TopologyMsg encoding and decoding.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -300,18 +300,16 @@
        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
        scenario, serverState);
      List<String> replicationServers = new ArrayList<String>();
      replicationServers.add("localhost:" + rsPort);
    Set<String> replicationServers = newSet("localhost:" + rsPort);
      fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500);
      if (startListen)
        fakeReplicationDomain.startListenService();
      // Test connection
      assertTrue(fakeReplicationDomain.isConnected());
      // Check connected server port
    HostPort rd =
        HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
      assertEquals(rd.getPort(), rsPort);
    // Check connected server port
    HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
    assertEquals(rd.getPort(), rsPort);
      return fakeReplicationDomain;
  }
@@ -1253,17 +1251,9 @@
    // Fake RS 3 scenario
    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
    Object[][] result = new Object[objectArrayList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    {
      result[i] = objectArray.toArray();
      i++;
    }
    debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + i);
    return result;
    debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : "
        + objectArrayList.size());
    return toDataProvider(objectArrayList);
  }
  /**
@@ -1862,11 +1852,16 @@
    // Fake RS sends update in assured mode
    objectArrayList = addPossibleParameters(objectArrayList, true, false);
    Object[][] result = new Object[objectArrayList.size()][];
    return toDataProvider(objectArrayList);
  }
  private Object[][] toDataProvider(List<List<Object>> listOfList)
  {
    Object[][] result = new Object[listOfList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    for (List<Object> list : listOfList)
    {
      result[i] = objectArray.toArray();
      result[i] = list.toArray();
      i++;
    }
    return result;
@@ -2292,14 +2287,7 @@
    // Other additional RS scenario
    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ);
    Object[][] result = new Object[objectArrayList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    {
      result[i] = objectArray.toArray();
      i++;
    }
    return result;
    return toDataProvider(objectArrayList);
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -616,7 +616,7 @@
      // Add the root entry in the backend
      backend2 = initializeTestBackend(false, backendId2);
      backend2.setPrivateBackend(true);
      SortedSet<String> replServers = newSet("localhost:" + replicationServerPort);
      SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers);
      domain2 = startNewDomain(domainConf, null,null);
@@ -2870,10 +2870,10 @@
      // Add the root entry in the backend
      backend2 = initializeTestBackend(false, TEST_BACKEND_ID2);
      SortedSet<String> replServers = newSet("localhost:" + replicationServerPort);
      SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
      // on o=test2,sid=1702 include attrs set to : 'sn'
      SortedSet<String> eclInclude = newSet("sn", "roomnumber");
      SortedSet<String> eclInclude = newSortedSet("sn", "roomnumber");
      DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1702, replServers);
      domain2 = startNewDomain(domainConf, eclInclude, eclInclude);
@@ -2881,15 +2881,15 @@
      backend3 = initializeTestBackend(false, backendId3);
      // on o=test3,sid=1703 include attrs set to : 'objectclass'
      eclInclude = newSet("objectclass");
      eclInclude = newSortedSet("objectclass");
      SortedSet<String> eclIncludeForDeletes = newSet("*");
      SortedSet<String> eclIncludeForDeletes = newSortedSet("*");
      domainConf = new DomainFakeCfg(baseDN3, 1703, replServers);
      domain3 = startNewDomain(domainConf, eclInclude, eclIncludeForDeletes);
      // on o=test2,sid=1704 include attrs set to : 'cn'
      eclInclude = newSet("cn");
      eclInclude = newSortedSet("cn");
      domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
      domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
@@ -3021,11 +3021,6 @@
    }
  }
  private static SortedSet<String> newSet(String... values)
  {
    return new TreeSet<String>(Arrays.asList(values));
  }
  private LDAPReplicationDomain startNewDomain(DomainFakeCfg domainConf,
      SortedSet<String> eclInclude, SortedSet<String> eclIncludeForDeletes)
      throws Exception
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -30,7 +30,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,12 +68,8 @@
  private long generationID = 1;
  public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
  public FakeReplicationDomain(DN baseDN, int serverID,
      Set<String> replicationServers, int window, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(baseDN, serverID, 100);
@@ -82,15 +78,10 @@
    this.queue = queue;
  }
  public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
      String exportString,
      StringBuilder importString,
      int exportedEntryCount) throws ConfigException
  public FakeReplicationDomain(DN baseDN, int serverID,
      Set<String> replicationServers, int window, long heartbeatInterval,
      String exportString, StringBuilder importString, int exportedEntryCount)
      throws ConfigException
  {
    super(baseDN, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -27,12 +27,10 @@
 */
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +40,8 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
/**
 * This class is the minimum implementation of a Concrete ReplicationDomain
 * used to test the Generic Replication Service.
@@ -55,12 +55,8 @@
   */
  private BlockingQueue<UpdateMsg> queue = null;
  public FakeStressReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
  public FakeStressReplicationDomain(DN baseDN, int serverID,
      Set<String> replicationServers, int window, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(baseDN, serverID, 100);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -45,6 +45,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.testng.Assert.*;
/**
@@ -92,16 +93,13 @@
      replServer2 = createReplicationServer(replServerID2, replServerPort2,
          "ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1);
      List<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort1);
      Set<String> servers = newSet("localhost:" + replServerPort1);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
          testService, domain1ServerId, servers, 100, 1000, rcvQueue1);
      List<String> servers2 = new ArrayList<String>(1);
      servers2.add("localhost:" + replServerPort2);
      Set<String> servers2 = newSet("localhost:" + replServerPort2);
      BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
      domain2 = new FakeReplicationDomain(
          testService, domain2ServerId, servers2, 100, 1000, rcvQueue2);
@@ -218,9 +216,7 @@
      replServer1 = createReplicationServer(replServerID1, replServerPort,
          "ReplicationDomainTestDb", 100000, "localhost:" + replServerPort);
      List<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      Set<String> servers = newSet("localhost:" + replServerPort);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
          testService, domain1ServerId, servers, 1000, 100000, rcvQueue1);
@@ -321,8 +317,7 @@
      replServer = createReplicationServer(replServerID, replServerPort,
          "exportAndImportData", 100);
      List<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      Set<String> servers = newSet("localhost:" + replServerPort);
      StringBuilder exportedDataBuilder = new StringBuilder();
      for (int i =0; i<ENTRYCOUNT; i++)
@@ -399,11 +394,8 @@
      replServer2 = createReplicationServer(replServerID2, replServerPort2,
          "exportAndImportservice2", 100, "localhost:" + replServerPort1);
      List<String> servers1 = new ArrayList<String>(1);
      servers1.add("localhost:" + replServerPort1);
      List<String> servers2 = new ArrayList<String>(1);
      servers2.add("localhost:" + replServerPort2);
      Set<String> servers1 = newSet("localhost:" + replServerPort1);
      Set<String> servers2 = newSet("localhost:" + replServerPort2);
      StringBuilder exportedDataBuilder = new StringBuilder();
      for (int i =0; i<ENTRYCOUNT; i++)