mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.27.2013 4bb3d3af3d032a98f2ca318c81be5c4f81034b8f
ReplicationBroker.java:
In computeInitialServerStatus(), used early exits.
In computeBestServerForWeight(), extracted methods computeBestServerWhenNotConnected() and computeBestServerWhenConnected().
Changed replicationServerUrls from Collection<String> to Set<String>.
Removed useless field initialization to null.
Renamed _publish() to publish() + reduced local variables scope.
In receive(), renamed local variable replicationServerID to previousRsServerID and used this one more rather than the field.
In changeConfig(), used Set.equals().
Changed getReplicationMonitor() to getReplicationMonitorInstanceName().

ReplicationDomain.java:
Consequence of the change to ReplicationBroker.getReplicationMonitor(), inlined getReplicationMonitorInstanceName().
Consequence of the change to ReplicationBroker.replicationServerUrls.

TestCaseUtils.java:
Moved newSet(), newSortedSet(), newList() here from test classes.

*Test.java:
Consequence of the changes to ReplicationBroker.replicationServerUrls.
12 files modified
396 ■■■■■ changed files
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 188 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 42 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java 16 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java 9 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 10 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 42 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 17 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 23 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 22 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,7 +31,10 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.*;
import java.net.ConnectException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +75,7 @@
  /**
   * Replication server URLs under this format: "<code>hostname:port</code>".
   */
  private volatile Collection<String> replicationServerUrls;
  private volatile Set<String> replicationServerUrls;
  private volatile boolean connected = false;
  /**
   * String reported under CSN=monitor when there is no connected RS.
@@ -98,9 +101,9 @@
  /** The server id of the RS we are connected to. */
  private Integer rsServerId = -1;
  /** The server URL of the RS we are connected to. */
  private String rsServerUrl = null;
  private String rsServerUrl;
  /** Our replication domain. */
  private ReplicationDomain domain = null;
  private ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
@@ -121,7 +124,7 @@
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  private HeartbeatMonitor heartbeatMonitor;
  /**
   * The number of times the connection was lost.
   */
@@ -140,7 +143,7 @@
   * The thread that publishes messages to the RS containing the current
   * change time of this DS.
   */
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
  /**
   * The expected period in milliseconds between these messages are sent
   * to the replication server. Zero means heartbeats are off.
@@ -149,8 +152,11 @@
  /*
   * Properties for the last topology info received from the network.
   */
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  /**
   * Info for other DSs.
   * <p>
   * Warning: does not contain info for us (for our server id)
   */
  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
  private volatile long generationID;
  private volatile int updateDoneCount = 0;
@@ -162,8 +168,7 @@
   * replication server one wants to connect. Key: replication server id Value:
   * replication server info for the matching replication server id
   */
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
    = null;
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
  /**
   * This integer defines when the best replication server checking algorithm
@@ -256,7 +261,7 @@
   *
   * @param replicationServers list of servers used
   */
  public void start(Collection<String> replicationServers)
  public void start(Set<String> replicationServers)
  {
    synchronized (startStopLock)
    {
@@ -1068,9 +1073,13 @@
    {
      // RS has no generation id
      return ServerStatus.NORMAL_STATUS;
    } else
    }
    else if (rsGenId != dsGenId)
    {
      if (rsGenId == dsGenId)
      // DS and RS do not have same generation id
      return ServerStatus.BAD_GEN_ID_STATUS;
    }
    else
      {
        /*
        DS and RS have same generation id
@@ -1082,9 +1091,7 @@
        value to determine if we are late or not
        */
        ServerStatus initStatus;
        int nChanges = ServerState.diffChanges(rsState, state);
        if (debugEnabled())
        {
          TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
@@ -1098,30 +1105,13 @@
        the changeStatusFromStatusAnalyzer method. This allows to take the
        lock roughly only when needed versus every sleep time timeout.
        */
        if (degradedStatusThreshold > 0)
      if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
        {
          if (nChanges >= degradedStatusThreshold)
          {
            initStatus = ServerStatus.DEGRADED_STATUS;
          } else
          {
            initStatus = ServerStatus.NORMAL_STATUS;
        return ServerStatus.DEGRADED_STATUS;
          }
        } else
        {
          /*
          0 threshold value means no degrading system used (no threshold):
          force normal status
          */
          initStatus = ServerStatus.NORMAL_STATUS;
        }
        return initStatus;
      } else
      {
        // DS and RS do not have same generation id
        return ServerStatus.BAD_GEN_ID_STATUS;
      }
      // degradedStatusThreshold value of '0' means no degrading system used
      // (no threshold): force normal status
      return ServerStatus.NORMAL_STATUS;
    }
  }
@@ -1476,16 +1466,12 @@
      {
        // We are not connected to a server yet
        return computeBestServerForWeight(bestServers, -1, -1);
      } else
      {
        /*
        We are already connected to a RS: compute the best RS as far as the
        weights is concerned. If this is another one, some DS must
        disconnect.
        */
        return computeBestServerForWeight(bestServers, rsServerId,
          localServerId);
      }
    /*
     * We are already connected to a RS: compute the best RS as far as the
     * weights is concerned. If this is another one, some DS must disconnect.
     */
    return computeBestServerForWeight(bestServers, rsServerId, localServerId);
  }
  /**
@@ -1783,11 +1769,12 @@
      sumOfWeights += replicationServerInfo.getWeight();
      sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
    }
    // Distance (difference) of the current loads to the load goals of each RS:
    // key:server id, value: distance
    Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
    // Precision for the operations (number of digits after the dot)
    MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
@@ -1812,8 +1799,20 @@
    if (currentRsServerId == -1)
    {
      // The local server is not connected yet
      // The local server is not connected yet, find best server to connect to,
      // taking the weights into account.
      return computeBestServerWhenNotConnected(bestServers, loadDistances);
    }
    // The local server is currently connected to a RS, let's see if it must
    // disconnect or not, taking the weights into account.
    return computeBestServerWhenConnected(bestServers, loadDistances,
        localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
  }
  private static ReplicationServerInfo computeBestServerWhenNotConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
      Map<Integer, BigDecimal> loadDistances)
  {
      /*
       * Find the server with the current highest distance to its load goal and
       * choose it. Make an exception if every server is correctly balanced,
@@ -1853,11 +1852,14 @@
        bestRsId = highestWeightRsId;
      }
      return bestServers.get(bestRsId);
    } else
    {
      // The local server is currently connected to a RS, let's see if it must
      // disconnect or not, taking the weights into account.
  }
  private static ReplicationServerInfo computeBestServerWhenConnected(
      Map<Integer, ReplicationServerInfo> bestServers,
      Map<Integer, BigDecimal> loadDistances, int localServerId,
      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
  {
    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
      float currentLoadDistance =
        loadDistances.get(currentRsServerId).floatValue();
      if (currentLoadDistance < 0)
@@ -1949,8 +1951,7 @@
            arrived on the new RS. But we should disconnect if we reach the
            perfect balance (both values are 0).
            */
            MathContext roundMc =
              new MathContext(6, RoundingMode.DOWN);
          MathContext roundMc = new MathContext(6, RoundingMode.DOWN);
            BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
              potentialCurrentRsNewLoadDistanceBd.round(roundMc);
            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
@@ -1972,18 +1973,16 @@
          ReplicationServerInfo currentRsInfo =
            bestServers.get(currentRsServerId);
          List<Integer> serversConnectedToCurrentRS =
            currentRsInfo.getConnectedDSs();
          List<Integer> sortedServers = new ArrayList<Integer>(
            serversConnectedToCurrentRS);
          Collections.sort(sortedServers);
            new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
        Collections.sort(serversConnectedToCurrentRS);
          // Go through the list of DSs to disconnect and see if the local
          // server is part of them.
          int index = 0;
          while (overloadingDSsNumber > 0)
          {
            int severToDisconnectId = sortedServers.get(index);
            if (severToDisconnectId == localServerId)
          int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
          if (serverIdToDisconnect == localServerId)
            {
              // The local server is part of the DSs to disconnect
              return null;
@@ -2005,7 +2004,6 @@
      }
      return bestServers.get(currentRsServerId);
    }
  }
  /**
   * Start the heartbeat monitor thread.
@@ -2117,7 +2115,7 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false, true);
    publish(msg, false, true);
  }
  /**
@@ -2128,7 +2126,7 @@
   */
  public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
  {
    return _publish(msg, false, retryOnFailure);
    return publish(msg, false, retryOnFailure);
  }
  /**
@@ -2137,7 +2135,7 @@
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true, true);
    publish(msg, true, true);
  }
  /**
@@ -2147,7 +2145,7 @@
   * @param retryOnFailure whether retry should be done on failure
   * @return whether the message was successfully sent.
   */
  boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
  private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
      boolean retryOnFailure)
  {
    boolean done = false;
@@ -2175,10 +2173,6 @@
      try
      {
        boolean credit;
        Session current_session;
        Semaphore currentWindowSemaphore;
        /*
        save the session at the time when we acquire the
        sendwindow credit so that we can make sure later
@@ -2187,9 +2181,11 @@
        on a session with a credit that was acquired from a previous
        session.
        */
        Session currentSession;
        Semaphore currentWindowSemaphore;
        synchronized (connectPhaseLock)
        {
          current_session = session;
          currentSession = session;
          currentWindowSemaphore = sendWindow;
        }
@@ -2204,6 +2200,7 @@
          return false;
        }
        boolean credit;
        if (msg instanceof UpdateMsg)
        {
          /*
@@ -2217,6 +2214,7 @@
        {
          credit = true;
        }
        if (credit)
        {
          synchronized (connectPhaseLock)
@@ -2228,8 +2226,7 @@
            reconnection happened and we need to restart from scratch.
            */
            if ((session != null) &&
                (session == current_session))
            if (session != null && session == currentSession)
            {
              session.publish(msg);
              done = true;
@@ -2340,7 +2337,7 @@
        break;
      }
      final int replicationServerID = rsServerId;
      final int previousRsServerID = rsServerId;
      try
      {
        ReplicationMsg msg = savedSession.receive();
@@ -2375,7 +2372,7 @@
        {
          // RS performs a proper disconnection
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
              replicationServerID, savedSession.getReadableRemoteAddress(),
              previousRsServerID, savedSession.getReadableRemoteAddress(),
              serverId, baseDN.toNormalizedString());
          logError(message);
@@ -2425,13 +2422,12 @@
            {
              // Stable topology (no topo msg since few seconds): proceed with
              // best server checking.
              ReplicationServerInfo bestServerInfo =
                computeBestReplicationServer(false, rsServerId, state,
                replicationServerInfos, serverId, groupId,
                generationID);
              if ((rsServerId != -1) && ((bestServerInfo == null) ||
                (bestServerInfo.getServerId() != rsServerId)))
              final ReplicationServerInfo bestServerInfo =
                  computeBestReplicationServer(false, previousRsServerID, state,
                      replicationServerInfos, serverId, groupId, generationID);
              if (previousRsServerID != -1
                  && (bestServerInfo == null
                      || bestServerInfo.getServerId() != previousRsServerID))
              {
                // The best replication server is no more the one we are
                // currently using. Disconnect properly then reconnect.
@@ -2439,14 +2435,14 @@
                if (bestServerInfo == null)
                {
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      baseDN.toNormalizedString());
                }
                else
                {
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
@@ -2483,7 +2479,7 @@
          {
            // We did not initiate the close on our side, log an error message.
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                serverId, baseDN.toNormalizedString(), replicationServerID,
                serverId, baseDN.toNormalizedString(), previousRsServerID,
                savedSession.getReadableRemoteAddress());
            logError(message);
          }
@@ -2684,9 +2680,8 @@
   *                            requires to restart the service.
   * @param groupId            The new group id to use
   */
  public boolean changeConfig(
    Collection<String> replicationServers, int window, long heartbeatInterval,
    byte groupId)
  public boolean changeConfig(Set<String> replicationServers, int window,
      long heartbeatInterval, byte groupId)
  {
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
@@ -2695,8 +2690,7 @@
    // the connection is modified
    boolean needToRestartSession =
        this.replicationServerUrls == null
        || replicationServers.size() != this.replicationServerUrls.size()
        || !replicationServers.containsAll(this.replicationServerUrls)
        || !replicationServers.equals(this.replicationServerUrls)
        || window != this.maxRcvWindow
        || heartbeatInterval != this.heartbeatInterval
        || groupId != this.groupId;
@@ -2788,12 +2782,10 @@
   */
  public List<RSInfo> getRsList()
  {
    List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo replicationServerInfo :
      replicationServerInfos.values())
    final List<RSInfo> result = new ArrayList<RSInfo>();
    for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
    {
      result.add(replicationServerInfo.toRSInfo());
      result.add(rsInfo.toRSInfo());
    }
    return result;
  }
@@ -2989,14 +2981,14 @@
  }
  /**
   * Returns the replication monitor associated with this broker.
   * Returns the replication monitor instance name associated with this broker.
   *
   * @return The replication monitor.
   * @return The replication monitor instance name.
   */
  ReplicationMonitor getReplicationMonitor()
  String getReplicationMonitorInstanceName()
  {
    // Only invoked by replication domain so always non-null.
    return monitor;
    return monitor.getMonitorInstanceName();
  }
  private void setSession(final Session newSession)
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1478,7 +1478,7 @@
      }
      if (debugEnabled())
        TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName()
        TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName()
            + " export ends with " + " connected=" + broker.isConnected()
            + " exportRootException=" + exportRootException);
@@ -1585,11 +1585,6 @@
    }
  }
  private String getReplicationMonitorInstanceName()
  {
    return broker.getReplicationMonitor().getMonitorInstanceName();
  }
  /**
   * For all remote servers in the start list:
   * - wait it has finished the import and present the expected generationID,
@@ -1835,7 +1830,8 @@
        msg = broker.receive(false, false, true);
        if (debugEnabled())
          TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName()
          TRACER.debugInfo("[IE] In "
              + broker.getReplicationMonitorInstanceName()
              + ", receiveEntryBytes " + msg);
        if (msg == null)
@@ -1888,7 +1884,7 @@
              broker.publish(amsg, false);
              if (debugEnabled())
                TRACER.debugInfo("[IE] In "
                    + getReplicationMonitorInstanceName()
                    + broker.getReplicationMonitorInstanceName()
                    + ", publish InitializeRcvAckMsg" + amsg);
            }
          }
@@ -2072,13 +2068,12 @@
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
          + Arrays.toString(lDIFEntry));
    // publish the message
    boolean sent = broker.publish(entryMessage, false);
    // process any publish error
    if (((!sent)||
        (broker.hasConnectionError()))||
        (broker.getNumLostConnections() != ieContext.initNumLostConnections))
    if (!sent
        || broker.hasConnectionError()
        || broker.getNumLostConnections() != ieContext.initNumLostConnections)
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2125,8 +2120,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeFromRemote(int source)
  throws DirectoryException
  public void initializeFromRemote(int source) throws DirectoryException
  {
    initializeFromRemote(source, null);
  }
@@ -2966,8 +2960,7 @@
   * @throws ConfigException     If the DirectoryServer configuration was
   *                             incorrect.
   */
  public void startPublishService(
      Collection<String> replicationServers, int window,
  public void startPublishService(Set<String> replicationServers, int window,
      long heartbeatInterval, long changetimeHeartbeatInterval)
  throws ConfigException
  {
@@ -3078,18 +3071,15 @@
  /**
   * Change some ReplicationDomain parameters.
   *
   * @param replicationServers  The new list of Replication Servers that this
   * @param replicationServers  The new set of Replication Servers that this
   *                           domain should now use.
   * @param windowSize         The window size that this domain should use.
   * @param heartbeatInterval  The heartbeatInterval that this domain should
   *                           use.
   * @param groupId            The new group id to use
   */
  public void changeConfig(
      Collection<String> replicationServers,
      int windowSize,
      long heartbeatInterval,
      byte groupId)
  public void changeConfig(Set<String> replicationServers, int windowSize,
      long heartbeatInterval, byte groupId)
  {
    this.groupId = groupId;
@@ -3576,15 +3566,13 @@
      Set<String> s2 = new HashSet<String>(s1);
      s2.addAll(includeAttributesForDeletes);
      Set<String> s = eclIncludesByServer.get(serverId);
      if (!s1.equals(s))
      if (!s1.equals(eclIncludesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      }
      s = eclIncludesForDeletesByServer.get(serverId);
      if (!s2.equals(s))
      if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer.put(serverId,
@@ -3592,7 +3580,7 @@
      }
      // and rebuild the global list to be ready for usage
      s = new HashSet<String>();
      Set<String> s = new HashSet<String>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        s.addAll(attributes);
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1922,4 +1922,20 @@
    dsconfig("set-backend-prop", "--backend-name", backendID,
             "--set", "enabled:" + enabled);
  }
  public static <T> Set<T> newSet(T... elems)
  {
    return new HashSet<T>(Arrays.asList(elems));
  }
  public static <T> SortedSet<T> newSortedSet(T... elems)
  {
    return new TreeSet<T>(Arrays.asList(elems));
  }
  public static <T> List<T> newList(T... elems)
  {
    return new ArrayList<T>(Arrays.asList(elems));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -236,7 +236,7 @@
  private void connect(ReplicationBroker broker, int port, int timeout) throws Exception
  {
    broker.start(Collections.singletonList("localhost:" + port));
    broker.start(Collections.singleton("localhost:" + port));
    // give some time to the broker to connect to the replicationServer.
    checkConnection(30, broker, port);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -375,8 +375,7 @@
  private void createFakeReplicationDomain(boolean firstBackend,
      long generationId) throws Exception
  {
    List<String> replicationServers = new ArrayList<String>();
    replicationServers.add("localhost:" + replServerPort);
    Set<String> replicationServers = newSet("localhost:" + replServerPort);
    DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
    replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId);
@@ -566,12 +565,8 @@
    private int exportedEntryCount;
    private long generationID = -1;
    public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
    public FakeReplicationDomain(DN baseDN, int serverID,
        Set<String> replicationServers, int window, long heartbeatInterval,
      long generationId) throws ConfigException
    {
      super(baseDN, serverID, 100);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,10 +29,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
@@ -229,9 +226,7 @@
    ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
    ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_,
        dsId, 100, generationId, 0, security, (byte) 1, 500);
    List<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + rs1Port);
    broker.start(servers);
    broker.start(Collections.singleton("localhost:" + rs1Port));
    checkConnection(30, broker, rs1Port);
    return broker;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -904,16 +904,6 @@
    };
  }
  private <T> Set<T> newSet(T... elems)
  {
    return new HashSet<T>(Arrays.asList(elems));
  }
  private <T> List<T> newList(T... elems)
  {
    return Arrays.asList(elems);
  }
  /**
   * Test TopologyMsg encoding and decoding.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -300,8 +300,7 @@
        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
        scenario, serverState);
      List<String> replicationServers = new ArrayList<String>();
      replicationServers.add("localhost:" + rsPort);
    Set<String> replicationServers = newSet("localhost:" + rsPort);
      fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500);
      if (startListen)
        fakeReplicationDomain.startListenService();
@@ -309,8 +308,7 @@
      // Test connection
      assertTrue(fakeReplicationDomain.isConnected());
      // Check connected server port
    HostPort rd =
        HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
    HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
      assertEquals(rd.getPort(), rsPort);
      return fakeReplicationDomain;
@@ -1253,17 +1251,9 @@
    // Fake RS 3 scenario
    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
    Object[][] result = new Object[objectArrayList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    {
      result[i] = objectArray.toArray();
      i++;
    }
    debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + i);
    return result;
    debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : "
        + objectArrayList.size());
    return toDataProvider(objectArrayList);
  }
  /**
@@ -1862,11 +1852,16 @@
    // Fake RS sends update in assured mode
    objectArrayList = addPossibleParameters(objectArrayList, true, false);
    Object[][] result = new Object[objectArrayList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    return toDataProvider(objectArrayList);
  }
  private Object[][] toDataProvider(List<List<Object>> listOfList)
    {
      result[i] = objectArray.toArray();
    Object[][] result = new Object[listOfList.size()][];
    int i = 0;
    for (List<Object> list : listOfList)
    {
      result[i] = list.toArray();
      i++;
    }
    return result;
@@ -2292,14 +2287,7 @@
    // Other additional RS scenario
    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ);
    Object[][] result = new Object[objectArrayList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    {
      result[i] = objectArray.toArray();
      i++;
    }
    return result;
    return toDataProvider(objectArrayList);
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -616,7 +616,7 @@
      // Add the root entry in the backend
      backend2 = initializeTestBackend(false, backendId2);
      backend2.setPrivateBackend(true);
      SortedSet<String> replServers = newSet("localhost:" + replicationServerPort);
      SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers);
      domain2 = startNewDomain(domainConf, null,null);
@@ -2870,10 +2870,10 @@
      // Add the root entry in the backend
      backend2 = initializeTestBackend(false, TEST_BACKEND_ID2);
      SortedSet<String> replServers = newSet("localhost:" + replicationServerPort);
      SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
      // on o=test2,sid=1702 include attrs set to : 'sn'
      SortedSet<String> eclInclude = newSet("sn", "roomnumber");
      SortedSet<String> eclInclude = newSortedSet("sn", "roomnumber");
      DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1702, replServers);
      domain2 = startNewDomain(domainConf, eclInclude, eclInclude);
@@ -2881,15 +2881,15 @@
      backend3 = initializeTestBackend(false, backendId3);
      // on o=test3,sid=1703 include attrs set to : 'objectclass'
      eclInclude = newSet("objectclass");
      eclInclude = newSortedSet("objectclass");
      SortedSet<String> eclIncludeForDeletes = newSet("*");
      SortedSet<String> eclIncludeForDeletes = newSortedSet("*");
      domainConf = new DomainFakeCfg(baseDN3, 1703, replServers);
      domain3 = startNewDomain(domainConf, eclInclude, eclIncludeForDeletes);
      // on o=test2,sid=1704 include attrs set to : 'cn'
      eclInclude = newSet("cn");
      eclInclude = newSortedSet("cn");
      domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
      domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
@@ -3021,11 +3021,6 @@
    }
  }
  private static SortedSet<String> newSet(String... values)
  {
    return new TreeSet<String>(Arrays.asList(values));
  }
  private LDAPReplicationDomain startNewDomain(DomainFakeCfg domainConf,
      SortedSet<String> eclInclude, SortedSet<String> eclIncludeForDeletes)
      throws Exception
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -30,7 +30,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,12 +68,8 @@
  private long generationID = 1;
  public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
  public FakeReplicationDomain(DN baseDN, int serverID,
      Set<String> replicationServers, int window, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(baseDN, serverID, 100);
@@ -82,15 +78,10 @@
    this.queue = queue;
  }
  public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
      String exportString,
      StringBuilder importString,
      int exportedEntryCount) throws ConfigException
  public FakeReplicationDomain(DN baseDN, int serverID,
      Set<String> replicationServers, int window, long heartbeatInterval,
      String exportString, StringBuilder importString, int exportedEntryCount)
      throws ConfigException
  {
    super(baseDN, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -27,12 +27,10 @@
 */
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +40,8 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
/**
 * This class is the minimum implementation of a Concrete ReplicationDomain
 * used to test the Generic Replication Service.
@@ -55,12 +55,8 @@
   */
  private BlockingQueue<UpdateMsg> queue = null;
  public FakeStressReplicationDomain(
      DN baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
  public FakeStressReplicationDomain(DN baseDN, int serverID,
      Set<String> replicationServers, int window, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(baseDN, serverID, 100);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -45,6 +45,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.testng.Assert.*;
/**
@@ -92,16 +93,13 @@
      replServer2 = createReplicationServer(replServerID2, replServerPort2,
          "ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1);
      List<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort1);
      Set<String> servers = newSet("localhost:" + replServerPort1);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
          testService, domain1ServerId, servers, 100, 1000, rcvQueue1);
      List<String> servers2 = new ArrayList<String>(1);
      servers2.add("localhost:" + replServerPort2);
      Set<String> servers2 = newSet("localhost:" + replServerPort2);
      BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
      domain2 = new FakeReplicationDomain(
          testService, domain2ServerId, servers2, 100, 1000, rcvQueue2);
@@ -218,9 +216,7 @@
      replServer1 = createReplicationServer(replServerID1, replServerPort,
          "ReplicationDomainTestDb", 100000, "localhost:" + replServerPort);
      List<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      Set<String> servers = newSet("localhost:" + replServerPort);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
          testService, domain1ServerId, servers, 1000, 100000, rcvQueue1);
@@ -321,8 +317,7 @@
      replServer = createReplicationServer(replServerID, replServerPort,
          "exportAndImportData", 100);
      List<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      Set<String> servers = newSet("localhost:" + replServerPort);
      StringBuilder exportedDataBuilder = new StringBuilder();
      for (int i =0; i<ENTRYCOUNT; i++)
@@ -399,11 +394,8 @@
      replServer2 = createReplicationServer(replServerID2, replServerPort2,
          "exportAndImportservice2", 100, "localhost:" + replServerPort1);
      List<String> servers1 = new ArrayList<String>(1);
      servers1.add("localhost:" + replServerPort1);
      List<String> servers2 = new ArrayList<String>(1);
      servers2.add("localhost:" + replServerPort2);
      Set<String> servers1 = newSet("localhost:" + replServerPort1);
      Set<String> servers2 = newSet("localhost:" + replServerPort2);
      StringBuilder exportedDataBuilder = new StringBuilder();
      for (int i =0; i<ENTRYCOUNT; i++)