From 55065c7531e93a725b02dc619f6c526228e768ce Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Oct 2013 14:19:46 +0000
Subject: [PATCH] LDAPReplicationDomain.java: Replaced instance fields with directly storing and using the ReplicationDomainCfg object.

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java             |    8 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java       |   45 -
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java    |   13 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java                |  147 ++----
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |   66 +-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java        |   14 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java    |   15 
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |  313 ++++++--------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java                 |   73 +--
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java        |    5 
 opends/src/messages/messages/replication.properties                                                              |    2 
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |   85 ---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |  346 +++++++----------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java       |   27 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java |   13 
 15 files changed, 497 insertions(+), 675 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 4bbda0c..dda3331 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -70,8 +70,6 @@
  base dn : %s
 MILD_ERR_ERROR_SEARCHING_RUV_15=Error %s when searching for server state %s : \
  %s base dn : %s
-NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
- server should be configured
 SEVERE_ERR_EXCEPTION_SENDING_TOPO_INFO_20=Caught IOException while sending \
  topology info (for update) on domain %s for %s server %s : %s
 MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 21767cc..d6ee68e 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -225,19 +225,7 @@
   private final SortedMap<CSN, FakeOperation> replayOperations =
     new TreeMap<CSN, FakeOperation>();
 
-  /**
-   * The isolation policy that this domain is going to use.
-   * This field describes the behavior of the domain when an update is
-   * attempted and the domain could not connect to any Replication Server.
-   * Possible values are accept-updates or deny-updates, but other values
-   * may be added in the future.
-   */
-  private IsolationPolicy isolationPolicy;
-
-  /**
-   * The DN of the configuration entry of this domain.
-   */
-  private final DN configDn;
+  private ReplicationDomainCfg config;
   private ExternalChangelogDomain eclDomain;
 
   /**
@@ -337,21 +325,6 @@
   private static final int FRACTIONAL_BECOME_NO_OP = 3;
 
   /**
-   * This configuration boolean indicates if this ReplicationDomain should log
-   * CSNs.
-   */
-  private boolean logCSN = false;
-
-  /**
-   * This configuration integer indicates the time the domain keeps the
-   * historical information necessary to solve conflicts.<br>
-   * When a change stored in the historical part of the user entry has a date
-   * (from its replication CSN) older than this delay, it is candidate to be
-   * purged.
-   */
-  private long histPurgeDelayInMilliSec = 0;
-
-  /**
    * The last CSN purged in this domain. Allows to have a continuous purging
    * process from one purge processing (task run) to the next one. Values 0 when
    * the server starts.
@@ -485,29 +458,14 @@
    * @throws ConfigException In case of invalid configuration.
    */
   public LDAPReplicationDomain(ReplicationDomainCfg configuration,
-    BlockingQueue<UpdateToReplay> updateToReplayQueue)
-    throws ConfigException
+      BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
   {
     super(configuration.getBaseDN(),
           configuration.getServerId(),
           configuration.getInitializationWindowSize());
 
-    // Read the configuration parameters.
-    Set<String> replicationServers = configuration.getReplicationServer();
-
-    int window  = configuration.getWindowSize();
-    /**
-     * The time in milliseconds between heartbeats from the replication
-     * server.  Zero means heartbeats are off.
-     */
-    long heartbeatInterval = configuration.getHeartbeatInterval();
-
-    this.isolationPolicy = configuration.getIsolationPolicy();
-    this.configDn = configuration.dn();
-    this.logCSN = configuration.isLogChangenumber();
+    this.config = configuration;
     this.updateToReplayQueue = updateToReplayQueue;
-    this.histPurgeDelayInMilliSec =
-      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
 
     // Get assured configuration
     readAssuredConfig(configuration, false);
@@ -566,8 +524,7 @@
     // register as an AlertGenerator
     DirectoryServer.registerAlertGenerator(this);
 
-    startPublishService(replicationServers, window, heartbeatInterval,
-        configuration.getChangetimeHeartbeatInterval());
+    startPublishService(configuration);
   }
 
   /**
@@ -1663,6 +1620,8 @@
       throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
     }
 
+    // FIXME should the next call use the initWindow parameter rather than the
+    // instance variable?
     super.initializeRemote(target, requestorID, initTask, this.initWindow);
   }
 
@@ -1850,6 +1809,7 @@
    */
   private boolean brokerIsConnected()
   {
+    final IsolationPolicy isolationPolicy = config.getIsolationPolicy();
     if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
     {
       // this policy imply that we always accept updates.
@@ -2131,7 +2091,7 @@
     // Note that a failed non-replication operation might not have a change
     // number.
     CSN curCSN = OperationContext.getCSN(op);
-    if (curCSN != null && logCSN)
+    if (curCSN != null && config.isLogChangenumber())
     {
       op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(),
           "replicationCSN", curCSN));
@@ -3526,7 +3486,7 @@
       {
         // If the base entry does not exist, save the generation
         // ID in the config entry
-        result = runSaveGenerationId(configDn, generationId);
+        result = runSaveGenerationId(config.dn(), generationId);
       }
 
       if (result != ResultCode.SUCCESS)
@@ -3573,7 +3533,7 @@
     {
       // if the base entry does not exist look for the generationID
       // in the config entry.
-      search = conn.processSearch(configDn.toString(),
+      search = conn.processSearch(config.dn().toString(),
           SearchScope.BASE_OBJECT,
           DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
           filter,attributes);
@@ -4125,16 +4085,8 @@
   public ConfigChangeResult applyConfigurationChange(
          ReplicationDomainCfg configuration)
   {
-    isolationPolicy = configuration.getIsolationPolicy();
-    logCSN = configuration.isLogChangenumber();
-    histPurgeDelayInMilliSec =
-      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
-
-    changeConfig(
-        configuration.getReplicationServer(),
-        configuration.getWindowSize(),
-        configuration.getHeartbeatInterval(),
-        (byte)configuration.getGroupId());
+    this.config = configuration;
+    changeConfig(configuration);
 
     // Read assured + fractional configuration and each time reconnect if needed
     readAssuredConfig(configuration, true);
@@ -4211,7 +4163,7 @@
   @Override
   public DN getComponentEntryDN()
   {
-    return configDn;
+    return config.dn();
   }
 
   /**
@@ -4234,7 +4186,7 @@
   {
     try
     {
-      DN eclConfigEntryDN = DN.decode("cn=external changeLog," + configDn);
+      DN eclConfigEntryDN = DN.decode("cn=external changeLog," + config.dn());
       if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
       {
         DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null);
@@ -4264,6 +4216,7 @@
     // unit test cases
     try
     {
+      DN configDn = config.dn();
       if (DirectoryServer.getConfigHandler().entryExists(configDn))
       {
         try
@@ -5272,14 +5225,14 @@
   }
 
   /**
-   * Return the purge delay (in ms) for the historical information stored
-   * in entries to solve conflicts for this domain.
+   * Return the minimum time (in ms) that the domain keeps the historical
+   * information necessary to solve conflicts.
    *
    * @return the purge delay.
    */
   public long getHistoricalPurgeDelay()
   {
-    return histPurgeDelayInMilliSec;
+    return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
   }
 
   /**
@@ -5348,7 +5301,7 @@
 
        EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
        lastCSNPurgedFromHist = entryHist.getOldestCSN();
-       entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
+       entryHist.setPurgeDelay(getHistoricalPurgeDelay());
        Attribute attr = entryHist.encodeAndPurge();
        count += entryHist.getLastPurgedValuesCount();
        List<Modification> mods = new LinkedList<Modification>();
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 585a7d2..1072f55 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
+import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.*;
@@ -72,10 +73,7 @@
   private static final DebugTracer TRACER = getTracer();
   private volatile boolean shutdown = false;
   private final Object startStopLock = new Object();
-  /**
-   * Replication server URLs under this format: "<code>hostname:port</code>".
-   */
-  private volatile Set<String> replicationServerUrls;
+  private volatile ReplicationDomainCfg config;
   private volatile boolean connected = false;
   /**
    * String reported under CSN=monitor when there is no connected RS.
@@ -84,18 +82,13 @@
   private volatile String replicationServer = NO_CONNECTED_SERVER;
   private volatile Session session;
   private final ServerState state;
-  private final DN baseDN;
-  private final int serverId;
   private Semaphore sendWindow;
   private int maxSendWindow;
   private int rcvWindow = 100;
   private int halfRcvWindow = rcvWindow / 2;
-  private int maxRcvWindow = rcvWindow;
   private int timeout = 0;
   private short protocolVersion;
   private ReplSessionSecurity replSessionSecurity;
-  /** My group id. */
-  private byte groupId = -1;
   /** The group id of the RS we are connected to. */
   private byte rsGroupId = -1;
   /** The server id of the RS we are connected to. */
@@ -117,11 +110,6 @@
   private Map<Integer, ServerState> replicaStates =
     new HashMap<Integer, ServerState>();
   /**
-   * The expected duration in milliseconds between heartbeats received
-   * from the replication server.  Zero means heartbeats are off.
-   */
-  private long heartbeatInterval = 0;
-  /**
    * A thread to monitor heartbeats on the session.
    */
   private HeartbeatMonitor heartbeatMonitor;
@@ -144,11 +132,6 @@
    * change time of this DS.
    */
   private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
-  /**
-   * The expected period in milliseconds between these messages are sent
-   * to the replication server. Zero means heartbeats are off.
-   */
-  private long changeTimeHeartbeatSendInterval = 0;
   /*
    * Properties for the last topology info received from the network.
    */
@@ -199,40 +182,23 @@
    * @param replicationDomain The replication domain that is creating us.
    * @param state The ServerState that should be used by this broker
    *        when negotiating the session with the replicationServer.
-   * @param baseDN The base DN that should be used by this broker
-   *        when negotiating the session with the replicationServer.
-   * @param serverId The server ID that should be used by this broker
-   *        when negotiating the session with the replicationServer.
-   * @param window The size of the send and receive window to use.
+   * @param config The configuration to use.
    * @param generationId The generationId for the server associated to the
    * provided serverId and for the domain associated to the provided baseDN.
-   * @param heartbeatInterval The interval (in ms) between heartbeats requested
-   *        from the replicationServer, or zero if no heartbeats are requested.
    * @param replSessionSecurity The session security configuration.
-   * @param groupId The group id of our domain.
-   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
-   *        time  heartbeats are sent to the RS,
-   *        or zero if no CSN heartbeat should be sent.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
-    ServerState state, DN baseDN, int serverId, int window,
-    long generationId, long heartbeatInterval,
-    ReplSessionSecurity replSessionSecurity, byte groupId,
-    long changeTimeHeartbeatInterval)
+      ServerState state, ReplicationDomainCfg config, long generationId,
+      ReplSessionSecurity replSessionSecurity)
   {
     this.domain = replicationDomain;
-    this.baseDN = baseDN;
-    this.serverId = serverId;
     this.state = state;
+    this.config = config;
     this.protocolVersion = ProtocolVersion.getCurrentVersion();
     this.replSessionSecurity = replSessionSecurity;
-    this.groupId = groupId;
     this.generationID = generationId;
-    this.heartbeatInterval = heartbeatInterval;
-    this.rcvWindow = window;
-    this.maxRcvWindow = window;
-    this.halfRcvWindow = window / 2;
-    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
+    this.rcvWindow = getMaxRcvWindow();
+    this.halfRcvWindow = rcvWindow / 2;
 
     /*
      * Only create a monitor if there is a replication domain (this is not the
@@ -251,31 +217,7 @@
     synchronized (startStopLock)
     {
       shutdown = false;
-      this.rcvWindow = this.maxRcvWindow;
-      connect();
-    }
-  }
-
-  /**
-   * Start the ReplicationBroker.
-   *
-   * @param replicationServers list of servers used
-   */
-  public void start(Set<String> replicationServers)
-  {
-    synchronized (startStopLock)
-    {
-      // Open Socket to the ReplicationServer Send the Start message
-      shutdown = false;
-      this.replicationServerUrls = replicationServers;
-
-      if (this.replicationServerUrls.size() < 1)
-      {
-        Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
-        logError(message);
-      }
-
-      this.rcvWindow = this.maxRcvWindow;
+      this.rcvWindow = getMaxRcvWindow();
       connect();
     }
   }
@@ -304,7 +246,22 @@
    */
   public int getServerId()
   {
-    return serverId;
+    return config.getServerId();
+  }
+
+  private DN getBaseDN()
+  {
+    return config.getBaseDN();
+  }
+
+  private Set<String> getReplicationServerUrls()
+  {
+    return config.getReplicationServer();
+  }
+
+  private byte getGroupId()
+  {
+    return (byte) config.getGroupId();
   }
 
   /**
@@ -358,7 +315,7 @@
       replicationServerInfo.setLocallyConfigured(false);
       return;
     }
-    for (String serverUrl : replicationServerUrls)
+    for (String serverUrl : getReplicationServerUrls())
     {
       if (isSameReplicationServerUrl(serverUrl, rsUrl))
       {
@@ -725,7 +682,7 @@
 
   private void connect()
   {
-    if (this.baseDN.toNormalizedString().equalsIgnoreCase(
+    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
         ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
     {
       connectAsECL();
@@ -745,7 +702,7 @@
     Map<Integer, ReplicationServerInfo> rsInfos =
       new ConcurrentHashMap<Integer, ReplicationServerInfo>();
 
-    for (String serverUrl : replicationServerUrls)
+    for (String serverUrl : getReplicationServerUrls())
     {
       // Connect to server and get info about it
       ReplicationServerInfo replicationServerInfo =
@@ -782,7 +739,7 @@
   private void connectAsECL()
   {
     // FIXME:ECL List of RS to connect is for now limited to one RS only
-    String bestServer = this.replicationServerUrls.iterator().next();
+    String bestServer = getReplicationServerUrls().iterator().next();
 
     if (performPhaseOneHandshake(bestServer, true, true) != null)
     {
@@ -841,6 +798,9 @@
 
     synchronized (connectPhaseLock)
     {
+      final int serverId = getServerId();
+      final DN baseDN = getBaseDN();
+
       /*
        * Connect to each replication server and get their ServerState then find
        * out which one is the best to connect to.
@@ -859,7 +819,7 @@
       {
         // At least one server answered, find the best one.
         electedRsInfo = computeBestReplicationServer(true, -1, state,
-          replicationServerInfos, serverId, groupId, getGenerationID());
+          replicationServerInfos, serverId, getGroupId(), getGenerationID());
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
@@ -940,8 +900,7 @@
           if (replicationServerInfos.size() > 0)
           {
             Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
-                serverId,
-                baseDN.toNormalizedString(),
+                serverId, baseDN.toNormalizedString(),
                 collectionToString(replicationServerInfos.keySet(), ", "));
             logError(message);
           }
@@ -969,6 +928,8 @@
   private void connectToReplicationServer(ReplicationServerInfo rsInfo,
       ServerStatus initStatus, TopologyMsg topologyMsg)
   {
+    final int serverId = getServerId();
+    final DN baseDN = getBaseDN();
     try
     {
       replicationServer = session.getReadableRemoteAddress();
@@ -1007,7 +968,7 @@
         }
       }
       sendWindow = new Semaphore(maxSendWindow);
-      rcvWindow = maxRcvWindow;
+      rcvWindow = getMaxRcvWindow();
       connected = true;
 
       /*
@@ -1020,6 +981,7 @@
             .getGenerationId(), session);
       }
 
+      final byte groupId = getGroupId();
       if (getRsGroupId() != groupId)
       {
         /*
@@ -1094,8 +1056,8 @@
       int nChanges = ServerState.diffChanges(rsState, state);
       if (debugEnabled())
       {
-        TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
-            + serverId + " computed " + nChanges + " changes late.");
+        TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
+            + getServerId() + " computed " + nChanges + " changes late.");
       }
 
       /*
@@ -1157,15 +1119,15 @@
       StartMsg serverStartMsg;
       if (!isECL)
       {
-        serverStartMsg = new ServerStartMsg(serverId, url,
-            baseDN, maxRcvWindow, heartbeatInterval, state,
-            getGenerationID(), isSslEncryption, groupId);
+        serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
+            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
+            getGenerationID(), isSslEncryption, getGroupId());
       }
       else
       {
         serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
-            maxRcvWindow, heartbeatInterval, state,
-            getGenerationID(), isSslEncryption, groupId);
+            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
+            getGenerationID(), isSslEncryption, getGroupId());
       }
       localSession.publish(serverStartMsg);
 
@@ -1174,7 +1136,7 @@
       ReplicationMsg msg = localSession.receive();
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
+        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
             + serverStartMsg + "\nAND RECEIVED:\n" + msg);
       }
 
@@ -1184,10 +1146,10 @@
 
       // Sanity check
       DN repDN = replServerInfo.getBaseDN();
-      if (!baseDN.equals(repDN))
+      if (!getBaseDN().equals(repDN))
       {
         errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
-            repDN.toNormalizedString(), baseDN.toNormalizedString());
+            repDN.toNormalizedString(), getBaseDN().toNormalizedString());
         return null;
       }
 
@@ -1222,20 +1184,21 @@
     }
     catch (ConnectException e)
     {
-      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
-          server, baseDN.toNormalizedString());
+      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
+          server, getBaseDN().toNormalizedString());
       return null;
     }
     catch (SocketTimeoutException e)
     {
-      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId,
-          server, baseDN.toNormalizedString());
+      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
+          server, getBaseDN().toNormalizedString());
       return null;
     }
     catch (Exception e)
     {
-      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
-          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
+      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
+          server, getBaseDN().toNormalizedString(),
+          stackTraceToSingleLineString(e));
       return null;
     }
     finally
@@ -1290,7 +1253,7 @@
       // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
+        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
             + startECLSessionMsg);
       }
 
@@ -1299,8 +1262,9 @@
       connected = true;
     } catch (Exception e)
     {
-      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
-          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
+      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
+          getServerId(), server, getBaseDN().toNormalizedString(),
+          stackTraceToSingleLineString(e));
       logError(message);
 
       setSession(null);
@@ -1356,7 +1320,7 @@
 
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
+        TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
             + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
       }
 
@@ -1365,8 +1329,9 @@
       return topologyMsg;
     } catch (Exception e)
     {
-      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
-          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
+      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
+          getServerId(), server, getBaseDN().toNormalizedString(),
+          stackTraceToSingleLineString(e));
       logError(message);
 
       setSession(null);
@@ -1405,7 +1370,6 @@
       Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
       byte groupId, long generationId)
   {
-
     // Shortcut, if only one server, this is the best
     if (rsInfos.size() == 1)
     {
@@ -1503,13 +1467,12 @@
   {
     Map<Integer, ReplicationServerInfo> result =
       new HashMap<Integer, ReplicationServerInfo>();
-
-    for (Integer rsId : bestServers.keySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-      if (replicationServerInfo.isLocallyConfigured())
+      ReplicationServerInfo rsInfo = entry.getValue();
+      if (rsInfo.isLocallyConfigured())
       {
-        result.put(rsId, replicationServerInfo);
+        result.put(entry.getKey(), rsInfo);
       }
     }
     return result;
@@ -1529,13 +1492,12 @@
   {
     Map<Integer, ReplicationServerInfo> result =
       new HashMap<Integer, ReplicationServerInfo>();
-
-    for (Integer rsId : bestServers.keySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-      if (replicationServerInfo.getGroupId() == groupId)
+      ReplicationServerInfo rsInfo = entry.getValue();
+      if (rsInfo.getGroupId() == groupId)
       {
-        result.put(rsId, replicationServerInfo);
+        result.put(entry.getKey(), rsInfo);
       }
     }
     return result;
@@ -1561,13 +1523,13 @@
       new HashMap<Integer, ReplicationServerInfo>();
     boolean emptyState = true;
 
-    for (Integer rsId : bestServers.keySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-      if (replicationServerInfo.getGenerationId() == generationId)
+      ReplicationServerInfo rsInfo = entry.getValue();
+      if (rsInfo.getGenerationId() == generationId)
       {
-        result.put(rsId, replicationServerInfo);
-        if (!replicationServerInfo.serverState.isEmpty())
+        result.put(entry.getKey(), rsInfo);
+        if (!rsInfo.serverState.isEmpty())
           emptyState = false;
       }
     }
@@ -1576,12 +1538,12 @@
     {
       // If the RS with a generationId have all an empty state,
       // then the 'empty'(genId=-1) RSes are also candidate
-      for (Integer rsId : bestServers.keySet())
+      for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
       {
-        ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-        if (replicationServerInfo.getGenerationId() == -1)
+        ReplicationServerInfo rsInfo = entry.getValue();
+        if (rsInfo.getGenerationId() == -1)
         {
-          result.put(rsId, replicationServerInfo);
+          result.put(entry.getKey(), rsInfo);
         }
       }
     }
@@ -1615,18 +1577,18 @@
     }
 
     /**
-     * Find replication servers who are up to date (or more up to date than us,
+     * Find replication servers that are up to date (or more up to date than us,
      * if for instance we failed and restarted, having sent some changes to the
      * RS but without having time to store our own state) regarding our own
-     * server id. If some servers more up to date, prefer this list but take
+     * server id. If some servers are more up to date, prefer this list but take
      * only the latest CSN.
      */
     CSN latestRsCSN = null;
-    for (Integer rsId : bestServers.keySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-      ServerState rsState = replicationServerInfo.getServerState();
-      CSN rsCSN = rsState.getCSN(localServerId);
+      final Integer rsId = entry.getKey();
+      final ReplicationServerInfo rsInfo = entry.getValue();
+      CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
       if (rsCSN == null)
       {
         rsCSN = new CSN(0, 0, localServerId);
@@ -1639,7 +1601,7 @@
         {
           // This replication server has exactly the latest change from the
           // local server
-          upToDateServers.put(rsId, replicationServerInfo);
+          upToDateServers.put(rsId, rsInfo);
         } else
         {
           // This replication server is even more up to date than the local
@@ -1653,13 +1615,13 @@
           {
             if (rsCSN.equals(latestRsCSN))
             {
-              moreUpToDateServers.put(rsId, replicationServerInfo);
+              moreUpToDateServers.put(rsId, rsInfo);
             } else
             {
               // This RS is even more up to date, clear the list and store this
               // new RS
               moreUpToDateServers.clear();
-              moreUpToDateServers.put(rsId, replicationServerInfo);
+              moreUpToDateServers.put(rsId, rsInfo);
               latestRsCSN = rsCSN;
             }
           }
@@ -1694,30 +1656,32 @@
      * Initially look for all servers on the same host. If we find one in the
      * same VM, then narrow the search.
      */
-    boolean filterServersInSameVM = false;
-    Map<Integer, ReplicationServerInfo> result =
+    boolean foundRSInSameVM = false;
+    final Map<Integer, ReplicationServerInfo> result =
         new HashMap<Integer, ReplicationServerInfo>();
-    for (Integer rsId : bestServers.keySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
-      final HostPort hp =
-          HostPort.valueOf(replicationServerInfo.getServerURL());
+      final Integer rsId = entry.getKey();
+      final ReplicationServerInfo rsInfo = entry.getValue();
+      final HostPort hp = HostPort.valueOf(rsInfo.getServerURL());
       if (hp.isLocalAddress())
       {
         if (isLocalReplicationServerPort(hp.getPort()))
         {
-          // An RS in the same VM will always have priority.
-          if (!filterServersInSameVM)
+          if (!foundRSInSameVM)
           {
+            // An RS in the same VM will always have priority.
             // Narrow the search to only include servers in this VM.
             result.clear();
-            filterServersInSameVM = true;
+            foundRSInSameVM = true;
           }
-          result.put(rsId, replicationServerInfo);
+          result.put(rsId, rsInfo);
         }
-        else if (!filterServersInSameVM)
+        else if (!foundRSInSameVM)
         {
-          result.put(rsId, replicationServerInfo);
+          // OK, accept RSs on the same machine because we have not found an RS
+          // in the same VM yet
+          result.put(rsId, rsInfo);
         }
         else
         {
@@ -1775,19 +1739,19 @@
     Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
     // Precision for the operations (number of digits after the dot)
     final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
-    for (Integer rsId : bestServers.keySet())
+    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
     {
-      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+      final Integer rsId = entry.getKey();
+      final ReplicationServerInfo rsInfo = entry.getValue();
 
-      int rsWeight = replicationServerInfo.getWeight();
       //  load goal = rs weight / sum of weights
-      BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide(
+      BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide(
           BigDecimal.valueOf(sumOfWeights), mathContext);
       BigDecimal currentLoadBd = BigDecimal.ZERO;
       if (sumOfConnectedDSs != 0)
       {
         // current load = number of connected DSs / total number of DSs
-        int connectedDSs = replicationServerInfo.getConnectedDSNumber();
+        int connectedDSs = rsInfo.getConnectedDSNumber();
         currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
             BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
       }
@@ -1944,7 +1908,7 @@
                   mathContext);
 
           /*
-          Now compare both values: we must no disconnect the DS if this
+          Now compare both values: we must not disconnect the DS if this
           is for going in a situation where the load distance of the other
           RSs is the opposite of the future load distance of the local RS
           or we would evaluate that we should disconnect just after being
@@ -2011,10 +1975,11 @@
   private void startRSHeartBeatMonitoring()
   {
     // Start a heartbeat monitor thread.
+    final long heartbeatInterval = config.getHeartbeatInterval();
     if (heartbeatInterval > 0)
     {
       heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
-          baseDN.toNormalizedString(), session, heartbeatInterval);
+          getBaseDN().toNormalizedString(), session, heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
@@ -2081,7 +2046,7 @@
         {
           MessageBuilder mb = new MessageBuilder();
           mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
-              baseDN.toNormalizedString(), e.getLocalizedMessage()));
+              getBaseDN().toNormalizedString(), e.getLocalizedMessage()));
           mb.append(stackTraceToSingleLineString(e));
           logError(mb.toMessage());
         }
@@ -2339,6 +2304,8 @@
         break;
       }
 
+      final int serverId = getServerId();
+      final DN baseDN = getBaseDN();
       final int previousRsServerID = rsServerId;
       try
       {
@@ -2428,7 +2395,8 @@
               // best server checking.
               final ReplicationServerInfo bestServerInfo =
                   computeBestReplicationServer(false, previousRsServerID, state,
-                      replicationServerInfos, serverId, groupId, generationID);
+                      replicationServerInfos, serverId, getGroupId(),
+                      generationID);
               if (previousRsServerID != -1
                   && (bestServerInfo == null
                       || bestServerInfo.getServerId() != previousRsServerID))
@@ -2520,7 +2488,7 @@
     monitorResponse.set(false);
 
     // publish Monitor Request Message to the Replication Server
-    publish(new MonitorRequestMsg(serverId, getRsServerId()));
+    publish(new MonitorRequestMsg(getServerId(), getRsServerId()));
 
     // wait for Response up to 10 seconds.
     try
@@ -2571,9 +2539,9 @@
   public void stop()
   {
     if (debugEnabled())
-      TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will"
-        + " close the connection to replication server " + rsServerId + " for"
-        + " domain " + baseDN);
+      TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
+          + " and will close the connection to replication server "
+          + rsServerId + " for domain " + getBaseDN());
 
     synchronized (startStopLock)
     {
@@ -2630,7 +2598,7 @@
    */
   public int getMaxRcvWindow()
   {
-    return maxRcvWindow;
+    return config.getWindowSize();
   }
 
   /**
@@ -2679,16 +2647,11 @@
   /**
    * Change some configuration parameters.
    *
-   * @param replicationServers  The new list of replication servers.
-   * @param window              The max window size.
-   * @param heartbeatInterval   The heartBeat interval.
-   *
+   * @param newConfig  The new config to use.
    * @return                    A boolean indicating if the changes
    *                            requires to restart the service.
-   * @param groupId            The new group id to use
    */
-  public boolean changeConfig(Set<String> replicationServers, int window,
-      long heartbeatInterval, byte groupId)
+  public boolean changeConfig(ReplicationDomainCfg newConfig)
   {
     // These parameters needs to be renegotiated with the ReplicationServer
     // so if they have changed, that requires restarting the session with
@@ -2696,18 +2659,14 @@
     // A new session is necessary only when information regarding
     // the connection is modified
     boolean needToRestartSession =
-        this.replicationServerUrls == null
-        || !replicationServers.equals(this.replicationServerUrls)
-        || window != this.maxRcvWindow
-        || heartbeatInterval != this.heartbeatInterval
-        || groupId != this.groupId;
+        !newConfig.getReplicationServer().equals(config.getReplicationServer())
+        || newConfig.getWindowSize() != config.getWindowSize()
+        || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval()
+        || newConfig.getGroupId() != config.getGroupId();
 
-    this.replicationServerUrls = replicationServers;
-    this.rcvWindow = window;
-    this.maxRcvWindow = window;
-    this.halfRcvWindow = window / 2;
-    this.heartbeatInterval = heartbeatInterval;
-    this.groupId = groupId;
+    this.config = newConfig;
+    this.rcvWindow = newConfig.getWindowSize();
+    this.halfRcvWindow = this.rcvWindow / 2;
 
     return needToRestartSession;
   }
@@ -2756,23 +2715,14 @@
     } catch (IOException ex)
     {
       Message message = ERR_EXCEPTION_SENDING_CS.get(
-        baseDN.toNormalizedString(),
-        Integer.toString(serverId),
+        getBaseDN().toNormalizedString(),
+        Integer.toString(getServerId()),
         ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
       logError(message);
     }
   }
 
   /**
-   * Sets the group id of the broker.
-   * @param groupId The new group id.
-   */
-  public void setGroupId(byte groupId)
-  {
-    this.groupId = groupId;
-  }
-
-  /**
    * Gets the info for DSs in the topology (except us).
    * @return The info for DSs in the topology (except us)
    */
@@ -2815,7 +2765,7 @@
       sent by the replication server in the topology message. We must count
       ourselves as a connected server.
       */
-      connectedDSs.add(serverId);
+      connectedDSs.add(getServerId());
     }
 
     for (DSInfo dsInfo : dsList)
@@ -2907,21 +2857,23 @@
   /**
    * Starts publishing to the RS the current timestamp used in this server.
    */
-  public void startChangeTimeHeartBeatPublishing()
+  private void startChangeTimeHeartBeatPublishing()
   {
     // Start a CSN heartbeat thread.
-    if (changeTimeHeartbeatSendInterval > 0)
+    long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
+    if (changeTimeHeartbeatInterval > 0)
     {
       final Session localSession = session;
       final String threadName = "Replica DS(" + getServerId()
           + ") change time heartbeat publisher for domain \""
-          + baseDN + "\" to RS(" + getRsServerId()
+          + getBaseDN() + "\" to RS(" + getRsServerId()
           + ") at " + localSession.getReadableRemoteAddress();
 
       ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
-          threadName, localSession, changeTimeHeartbeatSendInterval, serverId);
+          threadName, localSession, changeTimeHeartbeatInterval, getServerId());
       ctHeartbeatPublisherThread.start();
-    } else
+    }
+    else
     {
       if (debugEnabled())
         TRACER.debugInfo(this
@@ -2932,7 +2884,7 @@
   /**
    * Stops publishing to the RS the current timestamp used in this server.
    */
-  public synchronized void stopChangeTimeHeartBeatPublishing()
+  private synchronized void stopChangeTimeHeartBeatPublishing()
   {
     if (ctHeartbeatPublisherThread != null)
     {
@@ -2942,17 +2894,6 @@
   }
 
   /**
-   * Set a new change time heartbeat interval to this broker.
-   * @param changeTimeHeartbeatInterval The new interval (in ms).
-   */
-  public void setChangeTimeHeartbeatInterval(int changeTimeHeartbeatInterval)
-  {
-    stopChangeTimeHeartBeatPublishing();
-    this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
-    startChangeTimeHeartBeatPublishing();
-  }
-
-  /**
    * Set the connectRequiresRecovery to the provided value.
    * This flag is used to indicate if a recovery of Update is necessary
    * after a reconnection to a RS.
@@ -3044,8 +2985,9 @@
   {
     final StringBuilder sb = new StringBuilder();
     sb.append(getClass().getSimpleName())
-      .append(" \"").append(baseDN).append(" ").append(serverId).append("\",")
-      .append(" groupId=").append(groupId)
+      .append(" \"").append(getBaseDN()).append(" ")
+      .append(getServerId()).append("\",")
+      .append(" groupId=").append(getGroupId())
       .append(", genId=").append(generationID)
       .append(", connected=").append(connected).append(", ");
     if (rsServerId == -1)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4cb07d3..18d1730 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -41,6 +41,7 @@
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
+import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
@@ -68,8 +69,7 @@
  *   The startup phase of the ReplicationDomain subclass,
  *   should read the list of replication servers from the configuration,
  *   instantiate a {@link ServerState} then start the publish service
- *   by calling
- *   {@link #startPublishService(Set, int, long, long)}.
+ *   by calling {@link #startPublishService(ReplicationDomainCfg)}.
  *   At this point it can start calling the {@link #publish(UpdateMsg)}
  *   method if needed.
  * <p>
@@ -274,7 +274,7 @@
    * - and each initialized/importer DS that publishes acknowledges each
    *   WINDOW/2 data msg received.
    */
-  protected int initWindow = 100;
+  protected final int initWindow;
 
   /* Status related monitoring fields */
 
@@ -304,8 +304,7 @@
 
   private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
     new HashMap<Integer, Set<String>>();
-  private Set<String> eclIncludesForDeletesAllServers = Collections
-      .emptySet();
+  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
 
   /**
    * An object used to protect the initialization of the underlying broker
@@ -363,6 +362,7 @@
   {
     this.baseDN = baseDN;
     this.serverID = serverID;
+    this.initWindow = 100;
     this.state = serverState;
     this.generator = new CSNGenerator(serverID, state);
 
@@ -1060,7 +1060,7 @@
     public void run()
     {
       if (debugEnabled())
-        TRACER.debugInfo("[IE] starting " + this.getName());
+        TRACER.debugInfo("[IE] starting " + getName());
       try
       {
         initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
@@ -1075,7 +1075,7 @@
       }
 
       if (debugEnabled())
-        TRACER.debugInfo("[IE] ending " + this.getName());
+        TRACER.debugInfo("[IE] ending " + getName());
     }
   }
 
@@ -1313,7 +1313,7 @@
    */
   public int decodeTarget(String targetString) throws DirectoryException
   {
-    if (targetString.equalsIgnoreCase("all"))
+    if ("all".equalsIgnoreCase(targetString))
     {
       return RoutableMsg.ALL_SERVERS;
     }
@@ -1612,7 +1612,7 @@
             "[IE] wait for start dsid " + dsi.getDsId()
             + " " + dsi.getStatus()
             + " " + dsi.getGenerationId()
-            + " " + this.getGenerationID());
+            + " " + getGenerationID());
         if (ieContext.startList.contains(dsi.getDsId()))
         {
           if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
@@ -1711,7 +1711,7 @@
           }
           else
           {
-            if (dsInfo.getGenerationId() == this.getGenerationID())
+            if (dsInfo.getGenerationId() == getGenerationID())
             { // and with the expected generationId
               // We're done with this server
               it.remove();
@@ -1757,8 +1757,7 @@
     {
       // Rejects 2 simultaneous exports
       Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
-      throw new DirectoryException(ResultCode.OTHER,
-          message);
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
 
     ieContext = new IEContext(importInProgress);
@@ -1777,34 +1776,30 @@
    */
   private void processErrorMsg(ErrorMsg errorMsg)
   {
-    if (ieContext != null)
+    //Exporting must not be stopped on the first error, if we run initialize-all
+    if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
     {
-      /*
-        Exporting must not be stopped on the first error, if we
-        run initialize-all.
-      */
-      if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
+      // The ErrorMsg is received while we have started an initialization
+      if (ieContext.getException() == null)
       {
-        // The ErrorMsg is received while we have started an initialization
-        if (ieContext.getException() == null)
-          ieContext.setException(new DirectoryException(ResultCode.OTHER,
-              errorMsg.getDetails()));
+        ieContext.setException(
+            new DirectoryException(ResultCode.OTHER, errorMsg.getDetails()));
+      }
 
-        /*
-         * This can happen :
-         * - on the first InitReqMsg sent when source in not known for example
-         * - on the next attempt when source crashed and did not reconnect
-         *   even after the nextInitAttemptDelay
-         * During the import, the ErrorMsg will be received by receiveEntryBytes
-         */
-        if (ieContext.initializeTask instanceof InitializeTask)
-        {
-          // Update the task that initiated the import
-          ((InitializeTask)ieContext.initializeTask).
-          updateTaskCompletionState(ieContext.getException());
+      /*
+       * This can happen :
+       * - on the first InitReqMsg sent when source in not known for example
+       * - on the next attempt when source crashed and did not reconnect
+       *   even after the nextInitAttemptDelay
+       * During the import, the ErrorMsg will be received by receiveEntryBytes
+       */
+      if (ieContext.initializeTask instanceof InitializeTask)
+      {
+        // Update the task that initiated the import
+        ((InitializeTask) ieContext.initializeTask)
+            .updateTaskCompletionState(ieContext.getException());
 
-          releaseIEContext();
-        }
+        releaseIEContext();
       }
     }
   }
@@ -1894,8 +1889,7 @@
         {
           /*
           This is the normal termination of the import
-          No error is stored and the import is ended
-          by returning null
+          No error is stored and the import is ended by returning null
           */
           return null;
         }
@@ -1903,8 +1897,7 @@
         {
           /*
           This is an error termination during the import
-          The error is stored and the import is ended
-          by returning null
+          The error is stored and the import is ended by returning null
           */
           if (ieContext.getException() == null)
           {
@@ -1921,8 +1914,8 @@
         {
           // Other messages received during an import are trashed except
           // the topologyMsg.
-          if ((msg instanceof TopologyMsg) &&
-              (isRemoteDSConnected(ieContext.importSource)==null))
+          if (msg instanceof TopologyMsg
+              && isRemoteDSConnected(ieContext.importSource) == null)
           {
             Message errMsg =
               Message.raw(Category.SYNC, Severity.NOTICE,
@@ -2043,8 +2036,8 @@
         catch(Exception e) { /* do nothing */ }
 
         // process any connection error
-        if ((broker.hasConnectionError())||
-            (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
+        if (broker.hasConnectionError()
+          || broker.getNumLostConnections() != ieContext.initNumLostConnections)
         {
           // publish failed - store the error in the ieContext ...
           DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2485,8 +2478,7 @@
    * @throws DirectoryException When the generation ID of the Replication
    *                            Servers is not the expected value.
    */
-  private void checkGenerationID(long generationID)
-  throws DirectoryException
+  private void checkGenerationID(long generationID) throws DirectoryException
   {
     boolean allSet = true;
 
@@ -2535,7 +2527,7 @@
   public void resetReplicationLog() throws DirectoryException
   {
     // Reset the Generation ID to -1 to clean the ReplicationServers.
-    resetGenerationId((long)-1);
+    resetGenerationId(-1L);
 
     // check that at least one ReplicationServer did change its generation-id
     checkGenerationID(-1);
@@ -2573,43 +2565,35 @@
    * @throws DirectoryException   When an error occurs
    */
   public void resetGenerationId(Long generationIdNewValue)
-  throws DirectoryException
+      throws DirectoryException
   {
     if (debugEnabled())
       TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
           + " resetGenerationId " + generationIdNewValue);
 
-    ResetGenerationIdMsg genIdMessage;
-
-    if (generationIdNewValue == null)
-    {
-      genIdMessage = new ResetGenerationIdMsg(this.getGenerationID());
-    }
-    else
-    {
-      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
-    }
+    ResetGenerationIdMsg genIdMessage =
+        new ResetGenerationIdMsg(getGenId(generationIdNewValue));
 
     if (!isConnected())
     {
-      ResultCode resultCode = ResultCode.OTHER;
       Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
           Integer.toString(serverID),
           Long.toString(genIdMessage.getGenerationId()));
-      throw new DirectoryException(
-         resultCode, message);
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
     broker.publish(genIdMessage);
 
     // check that at least one ReplicationServer did change its generation-id
-    if (generationIdNewValue == null)
+    checkGenerationID(getGenId(generationIdNewValue));
+  }
+
+  private long getGenId(Long generationIdNewValue)
+  {
+    if (generationIdNewValue != null)
     {
-      checkGenerationID(this.getGenerationID());
+      return generationIdNewValue;
     }
-    else
-    {
-      checkGenerationID(generationIdNewValue);
-    }
+    return getGenerationID();
   }
 
 
@@ -2945,24 +2929,17 @@
    */
 
   /**
-   * Start the publish mechanism of the Replication Service.
-   * After this method has been called, the publish service can be used
-   * by calling the {@link #publish(UpdateMsg)} method.
+   * Start the publish mechanism of the Replication Service. After this method
+   * has been called, the publish service can be used by calling the
+   * {@link #publish(UpdateMsg)} method.
    *
-   * @param replicationServers   The replication servers that should be used.
-   * @param window               The window size of this replication domain.
-   * @param heartbeatInterval    The heartbeatInterval that should be used
-   *                             to check the availability of the replication
-   *                             servers.
-   * @param changetimeHeartbeatInterval  The interval used to send change
-   *                             time heartbeat to the replication server.
-   *
-   * @throws ConfigException     If the DirectoryServer configuration was
-   *                             incorrect.
+   * @param config
+   *          The configuration that should be used.
+   * @throws ConfigException
+   *           If the DirectoryServer configuration was incorrect.
    */
-  public void startPublishService(Set<String> replicationServers, int window,
-      long heartbeatInterval, long changetimeHeartbeatInterval)
-  throws ConfigException
+  public void startPublishService(ReplicationDomainCfg config)
+      throws ConfigException
   {
     synchronized (sessionLock)
     {
@@ -2970,15 +2947,8 @@
       {
         // create the broker object used to publish and receive changes
         broker = new ReplicationBroker(
-            this, state, baseDN,
-            serverID, window,
-            getGenerationID(),
-            heartbeatInterval,
-            new ReplSessionSecurity(),
-            getGroupId(),
-            changetimeHeartbeatInterval);
-
-        broker.start(replicationServers);
+            this, state, config, getGenerationID(), new ReplSessionSecurity());
+        broker.start();
       }
     }
   }
@@ -2990,7 +2960,7 @@
    * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
    * <p>
    * This method must be called once and must be called after the
-   * {@link #startPublishService(Collection, int, long, long)}.
+   * {@link #startPublishService(ReplicationDomainCfg)}.
    */
   public void startListenService()
   {
@@ -3040,12 +3010,12 @@
    * <p>
    * The Replication Service will restart from the point indicated by the
    * {@link ServerState} that was given as a parameter to the
-   * {@link #startPublishService(Collection, int, long, long)}
-   * at startup time.
+   * {@link #startPublishService(ReplicationDomainCfg)} at startup time.
+   * <p>
    * If some data have changed in the repository during the period of time when
    * the Replication Service was disabled, this {@link ServerState} should
-   * therefore be updated by the Replication Domain subclass before calling
-   * this method. This method is not MT safe.
+   * therefore be updated by the Replication Domain subclass before calling this
+   * method. This method is not MT safe.
    */
   public void enableService()
   {
@@ -3071,21 +3041,14 @@
   /**
    * Change some ReplicationDomain parameters.
    *
-   * @param replicationServers  The new set of Replication Servers that this
-   *                           domain should now use.
-   * @param windowSize         The window size that this domain should use.
-   * @param heartbeatInterval  The heartbeatInterval that this domain should
-   *                           use.
-   * @param groupId            The new group id to use
+   * @param config
+   *          The new configuration that this domain should now use.
    */
-  public void changeConfig(Set<String> replicationServers, int windowSize,
-      long heartbeatInterval, byte groupId)
+  public void changeConfig(ReplicationDomainCfg config)
   {
-    this.groupId = groupId;
+    this.groupId = (byte) config.getGroupId();
 
-    if (broker != null
-        && broker.changeConfig(replicationServers, windowSize,
-            heartbeatInterval, groupId))
+    if (broker != null && broker.changeConfig(config))
     {
       disableService();
       enableService();
@@ -3195,47 +3158,46 @@
     one. Only Safe Read mode makes sense in DS for returning an ack.
     */
     byte rsGroupId = broker.getRsGroupId();
-    if (msg.isAssured())
+    // Assured feature is supported starting from replication protocol V2
+    if (msg.isAssured()
+      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
     {
-      // Assured feature is supported starting from replication protocol V2
-      if (broker.getProtocolVersion() >=
-        ProtocolVersion.REPLICATION_PROTOCOL_V2)
+      AssuredMode msgAssuredMode = msg.getAssuredMode();
+      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
       {
-        AssuredMode msgAssuredMode = msg.getAssuredMode();
-        if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+        if (rsGroupId == groupId)
         {
-          if (rsGroupId == groupId)
+          // Send the ack
+          AckMsg ackMsg = new AckMsg(msg.getCSN());
+          if (replayErrorMsg != null)
           {
-            // Send the ack
-            AckMsg ackMsg = new AckMsg(msg.getCSN());
-            if (replayErrorMsg != null)
-            {
-              // Mark the error in the ack
-              //   -> replay error occurred
-              ackMsg.setHasReplayError(true);
-              //   -> replay error occurred in our server
-              List<Integer> idList = new ArrayList<Integer>();
-              idList.add(serverID);
-              ackMsg.setFailedServers(idList);
-            }
-            broker.publish(ackMsg);
-            if (replayErrorMsg != null)
-            {
-              assuredSrReceivedUpdatesNotAcked.incrementAndGet();
-            } else
-            {
-              assuredSrReceivedUpdatesAcked.incrementAndGet();
-            }
+            // Mark the error in the ack
+            //   -> replay error occurred
+            ackMsg.setHasReplayError(true);
+            //   -> replay error occurred in our server
+            List<Integer> idList = new ArrayList<Integer>();
+            idList.add(serverID);
+            ackMsg.setFailedServers(idList);
           }
-        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
-        {
-          Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
-              Integer.toString(serverID), msgAssuredMode.toString(),
-              getBaseDNString(), msg.toString());
-          logError(errorMsg);
+          broker.publish(ackMsg);
+          if (replayErrorMsg != null)
+          {
+            assuredSrReceivedUpdatesNotAcked.incrementAndGet();
+          }
+          else
+          {
+            assuredSrReceivedUpdatesAcked.incrementAndGet();
+          }
         }
-        // Nothing to do in Assured safe data mode, only RS ack updates.
       }
+      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+      {
+        Message errorMsg =
+            ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
+                msgAssuredMode.toString(), getBaseDNString(), msg.toString());
+        logError(errorMsg);
+      }
+        // Nothing to do in Assured safe data mode, only RS ack updates.
     }
 
     incProcessedUpdates();
@@ -3301,7 +3263,7 @@
   {
     byte rsGroupId = broker.getRsGroupId();
 
-    // If assured mode configured, wait for acknowledgement for the just sent
+    // If assured mode configured, wait for acknowledgment for the just sent
     // message
     if (assured && rsGroupId == groupId)
     {
@@ -3354,40 +3316,37 @@
           remove the update from the wait list, log the timeout error and
           also update assured monitoring counters
           */
-          UpdateMsg update = waitingAckMsgs.remove(csn);
-
-          if (update != null)
-          {
-            // No luck, this is a real timeout
-            // Increment assured replication monitoring counters
-            switch (msg.getAssuredMode())
-            {
-              case SAFE_READ_MODE:
-                assuredSrNotAcknowledgedUpdates.incrementAndGet();
-                assuredSrTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(
-                  assuredSrServerNotAcknowledgedUpdates,
-                  broker.getRsServerId());
-                break;
-              case SAFE_DATA_MODE:
-                assuredSdTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
-                  broker.getRsServerId());
-                break;
-              default:
-              // Should not happen
-            }
-
-            throw new TimeoutException("No ack received for message csn: "
-                + csn + " and replication servceID: " + baseDN + " after "
-                + assuredTimeout + " ms.");
-          } else
+          final UpdateMsg update = waitingAckMsgs.remove(csn);
+          if (update == null)
           {
             // Ack received just before timeout limit: we can exit
             break;
           }
+
+          // No luck, this is a real timeout
+          // Increment assured replication monitoring counters
+          switch (msg.getAssuredMode())
+          {
+          case SAFE_READ_MODE:
+            assuredSrNotAcknowledgedUpdates.incrementAndGet();
+            assuredSrTimeoutUpdates.incrementAndGet();
+            // Increment number of errors for our RS
+            updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
+                broker.getRsServerId());
+            break;
+          case SAFE_DATA_MODE:
+            assuredSdTimeoutUpdates.incrementAndGet();
+            // Increment number of errors for our RS
+            updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+                broker.getRsServerId());
+            break;
+          default:
+            // Should not happen
+          }
+
+          throw new TimeoutException("No ack received for message csn: " + csn
+              + " and replication domain: " + baseDN + " after "
+              + assuredTimeout + " ms.");
         }
       }
     }
@@ -3425,8 +3384,7 @@
       update = new UpdateMsg(generator.newCSN(), msg);
       /*
       If assured replication is configured, this will prepare blocking
-      mechanism. If assured replication is disabled, this returns
-      immediately
+      mechanism. If assured replication is disabled, this returns immediately
       */
       prepareWaitForAckIfAssuredEnabled(update);
 
@@ -3443,8 +3401,7 @@
       waitForAckIfAssuredEnabled(update);
     } catch (TimeoutException ex)
     {
-      // This exception may only be raised if assured replication is
-      // enabled
+      // This exception may only be raised if assured replication is enabled
       Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
           Long.toString(assuredTimeout), update.toString());
       logError(errorMsg);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index fba2fcf..966f5d0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -36,6 +36,7 @@
 import org.opends.messages.Severity;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.backends.task.TaskState;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.AddOperation;
@@ -45,10 +46,7 @@
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.plugin.GenerationIdChecksum;
-import org.opends.server.replication.plugin.LDAPReplicationDomain;
-import org.opends.server.replication.plugin.MultimasterReplication;
-import org.opends.server.replication.plugin.PersistentServerState;
+import org.opends.server.replication.plugin.*;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.Session;
@@ -62,6 +60,7 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.config.ConfigConstants.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -194,10 +193,10 @@
    * does not exist, take the 'empty backend' generationID.
    */
   protected ReplicationBroker openReplicationSession(final DN baseDN,
-      int serverId, int window_size, int port, int timeout,
+      int serverId, int windowSize, int port, int timeout,
       boolean emptyOldChanges) throws Exception
   {
-    return openReplicationSession(baseDN, serverId, window_size,
+    return openReplicationSession(baseDN, serverId, windowSize,
         port, timeout, emptyOldChanges, getGenerationId(baseDN), null);
   }
 
@@ -206,10 +205,10 @@
    * providing the generationId.
    */
   protected ReplicationBroker openReplicationSession(final DN baseDN,
-      int serverId, int window_size, int port, int timeout,
+      int serverId, int windowSize, int port, int timeout,
       boolean emptyOldChanges, long generationId) throws Exception
   {
-    return openReplicationSession(baseDN, serverId, window_size,
+    return openReplicationSession(baseDN, serverId, windowSize,
         port, timeout, emptyOldChanges, generationId, null);
   }
 
@@ -218,25 +217,42 @@
    * providing the generationId.
    */
   protected ReplicationBroker openReplicationSession(final DN baseDN,
-      int serverId, int window_size, int port, int timeout,
+      int serverId, int windowSize, int port, int timeout,
       boolean emptyOldChanges, long generationId,
       ReplicationDomain replicationDomain) throws Exception
   {
+    DomainFakeCfg config = newFakeCfg(baseDN, serverId, port);
+    config.setWindowSize(windowSize);
+    return openReplicationSession(config, port, timeout, emptyOldChanges,
+        generationId, replicationDomain);
+  }
+
+  protected ReplicationBroker openReplicationSession(ReplicationDomainCfg config,
+      int port, int timeout, boolean emptyOldChanges, long generationId,
+      ReplicationDomain replicationDomain) throws Exception
+  {
     ServerState state = new ServerState();
 
     if (emptyOldChanges)
-       new PersistentServerState(baseDN, serverId, new ServerState());
+      new PersistentServerState(config.getBaseDN(), config.getServerId(), new ServerState());
 
-    ReplicationBroker broker = new ReplicationBroker(replicationDomain,
-        state, baseDN, serverId, window_size,
-        generationId, 100000, getReplSessionSecurity(), (byte)1, 500);
+    ReplicationBroker broker = new ReplicationBroker(replicationDomain, state,
+        config, generationId, getReplSessionSecurity());
     connect(broker, port, timeout);
     return broker;
   }
 
-  private void connect(ReplicationBroker broker, int port, int timeout) throws Exception
+  protected DomainFakeCfg newFakeCfg(final DN baseDN, int serverId, int port)
   {
-    broker.start(Collections.singleton("localhost:" + port));
+    DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverId, newSortedSet("localhost:" + port));
+    fakeCfg.setHeartbeatInterval(100000);
+    fakeCfg.setChangetimeHeartbeatInterval(500);
+    return fakeCfg;
+  }
+
+  protected void connect(ReplicationBroker broker, int port, int timeout) throws Exception
+  {
+    broker.start();
     // give some time to the broker to connect to the replicationServer.
     checkConnection(30, broker, port);
 
@@ -274,33 +290,6 @@
     }
   }
 
-  /**
-   * Open a replicationServer session to the local ReplicationServer
-   * with a default value generationId.
-   */
-  protected ReplicationBroker openReplicationSession(final DN baseDN,
-      int serverId, int window_size, int port, int timeout, ServerState state)
-      throws Exception
-  {
-    return openReplicationSession(baseDN, serverId, window_size,
-        port, timeout, state, getGenerationId(baseDN));
-  }
-
-  /**
-   * Open a new session to the ReplicationServer
-   * starting with a given ServerState.
-   */
-  protected ReplicationBroker openReplicationSession(final DN baseDN,
-      int serverId, int window_size, int port, int timeout, ServerState state,
-      long generationId) throws Exception
-  {
-    ReplicationBroker broker = new ReplicationBroker(null,
-        state, baseDN, serverId, window_size, generationId,
-        100000, getReplSessionSecurity(), (byte)1, 500);
-    connect(broker, port, timeout);
-    return broker;
-  }
-
   protected void deleteEntry(DN dn) throws Exception
   {
     if (dn.getParent().getRDN().toString().equalsIgnoreCase("cn=domains"))
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
index 128c370..a56df75 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2007-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011 ForgeRock AS
+ *      Portions copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
@@ -31,11 +31,9 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.opends.server.admin.Configuration;
 import org.opends.server.admin.server.ConfigurationAddListener;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.server.ConfigurationDeleteListener;
-import org.opends.server.admin.server.ServerManagedObject;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
 import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
@@ -50,44 +48,44 @@
  */
 public class DomainFakeCfg implements ReplicationDomainCfg
 {
-  private DN baseDn;
+  private DN baseDN;
   private int serverId;
   private SortedSet<String> replicationServers;
   private long heartbeatInterval = 1000;
 
-  // By default changeTimeHeartbeatInterval is set to 0 in order to disable
-  // this feature and not kill the tests that expect to receive special
-  // messages.
+  /**
+   * By default changeTimeHeartbeatInterval is set to 0 in order to disable this
+   * feature and not kill the tests that expect to receive special messages.
+   */
   private long changeTimeHeartbeatInterval = 0;
 
   private IsolationPolicy policy = IsolationPolicy.REJECT_ALL_UPDATES;
 
-  // Is assured mode enabled or not ?
-  private boolean assured = false;
-  // Assured sub mode (used when assured is true)
+  /** Assured sub mode (used when assured is true) */
   private AssuredType assuredType = AssuredType.NOT_ASSURED;
-  // Safe Data level (used when assuredType is safe data)
+  /** Safe Data level (used when assuredType is safe data) */
   private int assuredSdLevel = 1;
-  // Timeout (in milliseconds) when waiting for acknowledgments
+  /** Timeout (in milliseconds) when waiting for acknowledgments */
   private long assuredTimeout = 1000;
-  // Group id
+  /** Group id */
   private int groupId = 1;
-  // Referrals urls to be published to other servers of the topology
-  SortedSet<String> refUrls = new TreeSet<String>();
+  /** Referrals urls to be published to other servers of the topology */
+  private SortedSet<String> refUrls = new TreeSet<String>();
 
   private SortedSet<String> fractionalExcludes = new TreeSet<String>();
   private SortedSet<String> fractionalIncludes = new TreeSet<String>();
 
   private ExternalChangelogDomainCfg eclCfg =
     new ExternalChangelogDomainFakeCfg(true, null, null);
+  private int windowSize = 100;
 
   /**
    * Creates a new Domain with the provided information
    * (assured mode disabled, default group id)
    */
-  public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers)
+  public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers)
   {
-    this.baseDn = baseDn;
+    this.baseDN = baseDN;
     this.serverId = serverId;
     this.replicationServers = replServers;
   }
@@ -96,10 +94,10 @@
    * Creates a new Domain with the provided information
    * (with some fractional configuration provided)
    */
-  public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers,
+  public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers,
     List<String> fractionalExcludes, List<String> fractionalIncludes)
   {
-    this(baseDn, serverId, replServers);
+    this(baseDN, serverId, replServers);
     if (fractionalExcludes != null)
     {
       for (String str : fractionalExcludes)
@@ -120,10 +118,10 @@
    * Creates a new Domain with the provided information
    * (assured mode disabled, group id provided)
    */
-  public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers,
+  public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers,
     int groupId)
   {
-    this(baseDn, serverId, replServers);
+    this(baseDN, serverId, replServers);
     this.groupId = groupId;
   }
 
@@ -131,19 +129,17 @@
    * Creates a new Domain with the provided information
    * (assured mode info provided as well as group id)
    */
-  public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers,
+  public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers,
     AssuredType assuredType, int assuredSdLevel, int groupId,
     long assuredTimeout, SortedSet<String> refUrls)
   {
-    this(baseDn, serverId, replServers);
+    this(baseDN, serverId, replServers);
     switch(assuredType)
     {
       case NOT_ASSURED:
-        assured = false;
         break;
       case SAFE_DATA:
       case SAFE_READ:
-        assured = true;
         this.assuredType = assuredType;
         break;
     }
@@ -157,33 +153,34 @@
   /**
    * Create a new Domain from the provided arguments.
    *
-   * @param string         The baseDN in string form.
+   * @param baseDN         The baseDN in string form.
    * @param serverId       The serverID.
    * @param replServer     The replication Server that will be used.
    *
    * @throws DirectoryException  When the provided string is not a valid DN.
    */
-  public DomainFakeCfg(String string, int serverId, String replServer)
+  public DomainFakeCfg(String baseDN, int serverId, String replServer)
          throws DirectoryException
   {
     this.replicationServers = new TreeSet<String>();
     this.replicationServers.add(replServer);
-    this.baseDn = DN.decode(string);
+    this.baseDN = DN.decode(baseDN);
     this.serverId = serverId;
   }
 
   /**
    * {@inheritDoc}
    */
+  @Override
   public void addChangeListener(
       ConfigurationChangeListener<ReplicationDomainCfg> listener)
   {
-
   }
 
   /**
    * {@inheritDoc}
    */
+  @Override
   public Class<? extends ReplicationDomainCfg> configurationClass()
   {
     return null;
@@ -192,6 +189,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public long getHeartbeatInterval()
   {
     return heartbeatInterval ;
@@ -200,14 +198,12 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public long getChangetimeHeartbeatInterval()
   {
     return changeTimeHeartbeatInterval;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   public void setChangetimeHeartbeatInterval(long changeTimeHeartbeatInterval)
   {
     this.changeTimeHeartbeatInterval = changeTimeHeartbeatInterval;
@@ -216,46 +212,16 @@
   /**
    * {@inheritDoc}
    */
-  public long getMaxReceiveDelay()
-  {
-    return 0;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public int getMaxReceiveQueue()
-  {
-    return 0;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public long getMaxSendDelay()
-  {
-    return 0;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public int getMaxSendQueue()
-  {
-    return 0;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
+  @Override
   public DN getBaseDN()
   {
-    return baseDn;
+    return baseDN;
   }
 
   /**
    * {@inheritDoc}
    */
+  @Override
   public SortedSet<String> getReplicationServer()
   {
     return replicationServers;
@@ -264,6 +230,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public int getServerId()
   {
     return serverId;
@@ -272,14 +239,21 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public int getWindowSize()
   {
-    return 100;
+    return this.windowSize;
+  }
+
+  public void setWindowSize(int windowSize)
+  {
+    this.windowSize = windowSize;
   }
 
   /**
    * {@inheritDoc}
    */
+  @Override
   public void removeChangeListener(
       ConfigurationChangeListener<ReplicationDomainCfg> listener)
   {
@@ -288,6 +262,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public DN dn()
   {
     try
@@ -300,16 +275,7 @@
   }
 
   /**
-   * {@inheritDoc}
-   */
-  public ServerManagedObject<? extends Configuration> managedObject() {
-    return null;
-  }
-
-  /**
    * Set the heartbeat interval.
-   *
-   * @param interval
    */
   public void setHeartbeatInterval(long interval)
   {
@@ -319,6 +285,7 @@
   /**
    * Get the isolation policy.
    */
+  @Override
   public IsolationPolicy getIsolationPolicy()
   {
     return policy;
@@ -334,66 +301,60 @@
     this.policy = policy;
   }
 
+  @Override
   public int getAssuredSdLevel()
   {
     return assuredSdLevel;
   }
 
+  @Override
   public int getGroupId()
   {
     return groupId;
   }
 
+  @Override
   public long getAssuredTimeout()
   {
     return assuredTimeout;
   }
 
+  @Override
   public AssuredType getAssuredType()
   {
     return assuredType;
   }
 
-  public boolean isAssured()
-  {
-    return assured;
-  }
-
+  @Override
   public SortedSet<String> getReferralsUrl()
   {
     return refUrls;
   }
 
+  @Override
   public SortedSet<String> getFractionalExclude()
   {
     return fractionalExcludes;
   }
 
+  @Override
   public SortedSet<String> getFractionalInclude()
   {
     return fractionalIncludes;
   }
 
+  @Override
   public boolean isSolveConflicts()
   {
     return true;
   }
 
-  public long getInitializationHeartbeatInterval()
-  {
-    return 180;
-  }
-
-
+  @Override
   public int getInitializationWindowSize()
   {
     return 100;
   }
 
-  public boolean hasExternalChangelogDomain() { return true; }
-
-
-
   /**
    * Gets the ECL Domain if it is present.
    *
@@ -402,6 +363,7 @@
    *           If the ECL Domain does not exist or it could not
    *           be successfully decoded.
    */
+  @Override
   public ExternalChangelogDomainCfg getExternalChangelogDomain()
   throws ConfigException
   { return eclCfg; }
@@ -410,7 +372,6 @@
   /**
    * Sets the ECL Domain if it is present.
    *
-   * @return Returns the ECL Domain if it is present.
    * @throws ConfigException
    *           If the ECL Domain does not exist or it could not
    *           be successfully decoded.
@@ -477,6 +438,7 @@
       ConfigurationDeleteListener<ExternalChangelogDomainCfg> listener)
   {}
 
+  @Override
   public boolean isLogChangenumber()
   {
     return true;
@@ -489,7 +451,8 @@
    * historical information necessary to solve conflicts.
    *
    * @return Returns the value of the "conflicts-historical-purge-delay" property.
-   **/
+   */
+  @Override
   public long getConflictsHistoricalPurgeDelay()
   {
     return 1440;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 3e9fc20..e6355af 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -375,10 +375,10 @@
   private void createFakeReplicationDomain(boolean firstBackend,
       long generationId) throws Exception
   {
-    Set<String> replicationServers = newSet("localhost:" + replServerPort);
+    SortedSet<String> replicationServers = newSortedSet("localhost:" + replServerPort);
 
     DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
-    replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId);
+    replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 1000, generationId);
 
     // Test connection
     assertTrue(replicationDomain.isConnected());
@@ -555,23 +555,26 @@
         new LinkedBlockingQueue<UpdateMsg>();
 
     /** A string that will be exported should exportBackend be called. */
-    private String exportString = null;
+    private String exportString;
 
     /**
      * A StringBuilder that will be used to build a new String should the import
      * be called.
      */
-    private StringBuilder importString = null;
+    private StringBuilder importString;
     private int exportedEntryCount;
     private long generationID = -1;
 
     public FakeReplicationDomain(DN baseDN, int serverID,
-        Set<String> replicationServers, int window, long heartbeatInterval,
+        SortedSet<String> replicationServers, long heartbeatInterval,
         long generationId) throws ConfigException
     {
       super(baseDN, serverID, 100);
       generationID = generationId;
-      startPublishService(replicationServers, window, heartbeatInterval, 500);
+      DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
+      fakeCfg.setHeartbeatInterval(heartbeatInterval);
+      fakeCfg.setChangetimeHeartbeatInterval(500);
+      startPublishService(fakeCfg);
       startListenService();
     }
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 3ae5b3a..acdf3c2 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -32,6 +32,7 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.assertj.core.api.Assertions;
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
@@ -65,11 +66,11 @@
 
   public static class TestBroker extends ReplicationBroker
   {
-    List<ReplicationMsg> list = null;
+    private List<ReplicationMsg> list;
 
     public TestBroker(List<ReplicationMsg> list)
     {
-      super(null, null, null, 0, 0, 0, 0, null, (byte) 0, 0);
+      super(null, null, new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), 0, null);
       this.list = list;
     }
 
@@ -157,9 +158,7 @@
     // Read the entry back to get its historical and included CSN
     Entry entry = DirectoryServer.getEntry(dn1);
     List<Attribute> attrs1 = entry.getAttribute(histType);
-
-    assertTrue(attrs1 != null);
-    assertTrue(attrs1.isEmpty() != true);
+      Assertions.assertThat(attrs1).isNotEmpty();
 
     String histValue =
       attrs1.get(0).iterator().next().getValue().toString();
@@ -178,9 +177,7 @@
 
     Entry entry2 = DirectoryServer.getEntry(dn1);
     List<Attribute> attrs2 = entry2.getAttribute(histType);
-
-    assertTrue(attrs2 != null);
-    assertTrue(attrs2.isEmpty() != true);
+      Assertions.assertThat(attrs2).isNotEmpty();
 
     for (AttributeValue av : attrs2.get(0)) {
       logError(Message.raw(Category.SYNC, Severity.INFORMATION,
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
index 48f26fd..e2e9df0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -58,6 +58,7 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.testng.Assert.*;
@@ -223,10 +224,11 @@
   private ReplicationBroker createReplicationBroker(int dsId,
       ServerState state, long generationId) throws Exception
   {
+    SortedSet<String> replServers = newSortedSet("localhost:" + rs1Port);
+    DomainFakeCfg fakeCfg = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers);
     ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
-    ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_,
-        dsId, 100, generationId, 0, security, (byte) 1, 500);
-    broker.start(Collections.singleton("localhost:" + rs1Port));
+    ReplicationBroker broker = new ReplicationBroker(null, state, fakeCfg, generationId, security);
+    broker.start();
     checkConnection(30, broker, rs1Port);
     return broker;
   }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 079a806..ceced03 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -40,10 +40,12 @@
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
 import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.*;
+import org.opends.server.replication.plugin.DomainFakeCfg;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.service.ReplicationDomain;
@@ -243,8 +245,9 @@
     int scenario)
         throws Exception
   {
-    return createFakeReplicationDomain(serverId, groupId, rsId, generationId, assured,
-      assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true, 100);
+    ReplicationDomainCfg config = newFakeCfg(serverId, getRsPort(rsId));
+    return createFakeReplicationDomain(config, groupId, rsId, generationId, assured,
+      assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true);
   }
 
   private int getRsPort(int rsId)
@@ -255,8 +258,6 @@
   /**
    * Creates a new fake replication domain, using the passed scenario.
    *
-   * @param serverId
-   *          The server ID for the replication domain.
    * @param groupId
    *          The group ID for the replication domain.
    * @param rsId
@@ -278,40 +279,44 @@
    * @param startListen
    *          If true, we start the listen service. In all cases, the publish
    *          service gets started.
-   * @param window
-   *          The window size for replication
    * @return
    *          The FakeReplicationDomain, a mock-up of a Replication Domain
    *          for tests
    * @throws Exception
-   *
    */
-  private FakeReplicationDomain createFakeReplicationDomain(int serverId,
-            int groupId, int rsId, long generationId, boolean assured,
-            AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
-            int scenario, ServerState serverState, boolean startListen, int window)
-      throws Exception
+  private FakeReplicationDomain createFakeReplicationDomain(
+      ReplicationDomainCfg config, int groupId, int rsId, long generationId,
+      boolean assured, AssuredMode assuredMode, int safeDataLevel,
+      long assuredTimeout, int scenario, ServerState serverState,
+      boolean startListen) throws Exception
   {
-      // Set port to right real RS according to its id
-      int rsPort = getRsPort(rsId);
+    // Set port to right real RS according to its id
+    int rsPort = getRsPort(rsId);
 
-      FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
-        DN.decode(TEST_ROOT_DN_STRING), serverId, generationId,
-        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
-        scenario, serverState);
+    FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
+        config.getBaseDN(), config.getServerId(), generationId, (byte) groupId,
+        assured, assuredMode, (byte) safeDataLevel, assuredTimeout, scenario, serverState);
 
-    Set<String> replicationServers = newSet("localhost:" + rsPort);
-      fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500);
-      if (startListen)
-        fakeReplicationDomain.startListenService();
+    fakeReplicationDomain.startPublishService(config);
+    if (startListen)
+      fakeReplicationDomain.startListenService();
 
-      // Test connection
-      assertTrue(fakeReplicationDomain.isConnected());
+    // Test connection
+    assertTrue(fakeReplicationDomain.isConnected());
     // Check connected server port
     HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
     assertEquals(rd.getPort(), rsPort);
 
-      return fakeReplicationDomain;
+    return fakeReplicationDomain;
+  }
+
+  private DomainFakeCfg newFakeCfg(int serverId, int rsPort) throws Exception
+  {
+    DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
+    DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverId, newSortedSet("localhost:" + rsPort));
+    fakeCfg.setHeartbeatInterval(1000);
+    fakeCfg.setChangetimeHeartbeatInterval(500);
+    return fakeCfg;
   }
 
   /**
@@ -2103,9 +2108,10 @@
       // Create and connect DS 2 to RS 1
       // Assured mode: SR
       ServerState serverState = fakeRd1.getServerState();
-      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+      ReplicationDomainCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID));
+      fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
         DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO, serverState, true, 100);
+              REPLY_OK_DS_SCENARIO, serverState, true);
 
       // Wait for connections to be established
       waitForStableTopo(fakeRd1, 1, 1);
@@ -3181,9 +3187,11 @@
         TIMEOUT_DS_SCENARIO);
 
       // DS 2 connected to RS 1 with low window to easily put it in DEGRADED status
-      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+      DomainFakeCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID));
+      config.setWindowSize(2);
+      fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
         DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO, new ServerState(), false, 2);
+        REPLY_OK_DS_SCENARIO, new ServerState(), false);
 
       // Wait for connections to be finished
       // DS must see expected numbers of DSs/RSs
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 2360491..38218af 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -1615,14 +1615,16 @@
     try
     {
       // Create broker on o=test
-      server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
-          100, replicationServerPort, brokerSessionTimeout, true);
-      server01.setChangeTimeHeartbeatInterval(100); //ms
+      DomainFakeCfg config1 = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
+      config1.setChangetimeHeartbeatInterval(100); // ms
+      server01 = openReplicationSession(config1, replicationServerPort,
+          brokerSessionTimeout, true, getGenerationId(TEST_ROOT_DN), null);
 
       // Create broker on o=test2
-      server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
-          100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
-      server02.setChangeTimeHeartbeatInterval(100); //ms
+      DomainFakeCfg config2 = newFakeCfg(TEST_ROOT_DN2, SERVER_ID_2, replicationServerPort);
+      config2.setChangetimeHeartbeatInterval(100); //ms
+      server02 = openReplicationSession(config2, replicationServerPort,
+          brokerSessionTimeout, true, EMPTY_DN_GENID, null);
 
       int ts = 1;
       // Produce update 1
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 6f26f6c..d91de26 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -356,8 +356,9 @@
 
     // Connect to the replicationServer using the state created above.
     try {
-      broker = openReplicationSession(TEST_ROOT_DN,
-          3, 100, replicationServerPort, 5000, state);
+      broker = new ReplicationBroker(null, state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort),
+          getGenerationId(TEST_ROOT_DN), getReplSessionSecurity());
+      connect(broker, replicationServerPort, 5000);
 
       ReplicationMsg receivedMsg = broker.receive();
       broker.updateWindowAfterReplay();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 08cc1b1..78b23ba 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -30,11 +30,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Set;
+import java.util.SortedSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.config.ConfigException;
+import org.opends.server.replication.plugin.DomainFakeCfg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
@@ -68,24 +69,32 @@
 
   private long generationID = 1;
 
-  public FakeReplicationDomain(DN baseDN, int serverID,
-      Set<String> replicationServers, int window, long heartbeatInterval,
-      BlockingQueue<UpdateMsg> queue) throws ConfigException
+  private FakeReplicationDomain(DN baseDN, int serverID,
+      SortedSet<String> replicationServers, long heartbeatInterval)
+      throws ConfigException
   {
     super(baseDN, serverID, 100);
-    startPublishService(replicationServers, window, heartbeatInterval, 500);
+    DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
+    fakeCfg.setHeartbeatInterval(heartbeatInterval);
+    fakeCfg.setChangetimeHeartbeatInterval(500);
+    startPublishService(fakeCfg);
     startListenService();
+  }
+
+  public FakeReplicationDomain(DN baseDN, int serverID,
+      SortedSet<String> replicationServers, long heartbeatInterval,
+      BlockingQueue<UpdateMsg> queue) throws ConfigException
+  {
+    this(baseDN, serverID, replicationServers, heartbeatInterval);
     this.queue = queue;
   }
 
   public FakeReplicationDomain(DN baseDN, int serverID,
-      Set<String> replicationServers, int window, long heartbeatInterval,
+      SortedSet<String> replicationServers, long heartbeatInterval,
       String exportString, StringBuilder importString, int exportedEntryCount)
       throws ConfigException
   {
-    super(baseDN, serverID, 100);
-    startPublishService(replicationServers, window, heartbeatInterval, 500);
-    startListenService();
+    this(baseDN, serverID, replicationServers, heartbeatInterval);
     this.exportString = exportString;
     this.importString = importString;
     this.exportedEntryCount = exportedEntryCount;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index 87148d7..9191fd3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -30,11 +30,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Set;
+import java.util.SortedSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.config.ConfigException;
+import org.opends.server.replication.plugin.DomainFakeCfg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
@@ -53,14 +54,18 @@
    * A blocking queue that is used to send the UpdateMsg received from the
    * Replication Service.
    */
-  private BlockingQueue<UpdateMsg> queue = null;
+  private BlockingQueue<UpdateMsg> queue;
 
   public FakeStressReplicationDomain(DN baseDN, int serverID,
-      Set<String> replicationServers, int window, long heartbeatInterval,
+      SortedSet<String> replicationServers, long heartbeatInterval,
       BlockingQueue<UpdateMsg> queue) throws ConfigException
   {
     super(baseDN, serverID, 100);
-    startPublishService(replicationServers, window, heartbeatInterval, 500);
+    final DomainFakeCfg fakeCfg =
+        new DomainFakeCfg(baseDN, serverID, replicationServers);
+    fakeCfg.setHeartbeatInterval(heartbeatInterval);
+    fakeCfg.setChangetimeHeartbeatInterval(500);
+    startPublishService(fakeCfg);
     startListenService();
     this.queue = queue;
   }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 3c8190c..e2158ea 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -93,16 +93,15 @@
       replServer2 = createReplicationServer(replServerID2, replServerPort2,
           "ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1);
 
-      Set<String> servers = newSet("localhost:" + replServerPort1);
-
+      SortedSet<String> servers = newSortedSet("localhost:" + replServerPort1);
       BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
       domain1 = new FakeReplicationDomain(
-          testService, domain1ServerId, servers, 100, 1000, rcvQueue1);
+          testService, domain1ServerId, servers, 1000, rcvQueue1);
 
-      Set<String> servers2 = newSet("localhost:" + replServerPort2);
+      SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2);
       BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
       domain2 = new FakeReplicationDomain(
-          testService, domain2ServerId, servers2, 100, 1000, rcvQueue2);
+          testService, domain2ServerId, servers2, 1000, rcvQueue2);
 
       Thread.sleep(500);
 
@@ -216,10 +215,10 @@
       replServer1 = createReplicationServer(replServerID1, replServerPort,
           "ReplicationDomainTestDb", 100000, "localhost:" + replServerPort);
 
-      Set<String> servers = newSet("localhost:" + replServerPort);
+      SortedSet<String> servers = newSortedSet("localhost:" + replServerPort);
       BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
       domain1 = new FakeReplicationDomain(
-          testService, domain1ServerId, servers, 1000, 100000, rcvQueue1);
+          testService, domain1ServerId, servers, 100000, rcvQueue1);
 
 
       /*
@@ -317,7 +316,7 @@
 
       replServer = createReplicationServer(replServerID, replServerPort,
           "exportAndImportData", 100);
-      Set<String> servers = newSet("localhost:" + replServerPort);
+      SortedSet<String> servers = newSortedSet("localhost:" + replServerPort);
 
       StringBuilder exportedDataBuilder = new StringBuilder();
       for (int i =0; i<ENTRYCOUNT; i++)
@@ -326,13 +325,11 @@
       }
       String exportedData=exportedDataBuilder.toString();
       domain1 = new FakeReplicationDomain(
-          testService, serverId1, servers,
-          100, 0, exportedData, null, ENTRYCOUNT);
+          testService, serverId1, servers, 0, exportedData, null, ENTRYCOUNT);
 
       StringBuilder importedData = new StringBuilder();
       domain2 = new FakeReplicationDomain(
-          testService, serverId2, servers, 100, 0,
-          null, importedData, 0);
+          testService, serverId2, servers, 0, null, importedData, 0);
 
       /*
        * Trigger a total update from domain1 to domain2.
@@ -394,8 +391,8 @@
       replServer2 = createReplicationServer(replServerID2, replServerPort2,
           "exportAndImportservice2", 100, "localhost:" + replServerPort1);
 
-      Set<String> servers1 = newSet("localhost:" + replServerPort1);
-      Set<String> servers2 = newSet("localhost:" + replServerPort2);
+      SortedSet<String> servers1 = newSortedSet("localhost:" + replServerPort1);
+      SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2);
 
       StringBuilder exportedDataBuilder = new StringBuilder();
       for (int i =0; i<ENTRYCOUNT; i++)
@@ -404,13 +401,11 @@
       }
       String exportedData=exportedDataBuilder.toString();
       domain1 = new FakeReplicationDomain(
-          testService, 1, servers1,
-          100, 0, exportedData, null, ENTRYCOUNT);
+          testService, 1, servers1, 0, exportedData, null, ENTRYCOUNT);
 
       StringBuilder importedData = new StringBuilder();
       domain2 = new FakeReplicationDomain(
-          testService, 2, servers2, 100, 0,
-          null, importedData, 0);
+          testService, 2, servers2, 0, null, importedData, 0);
 
       domain2.initializeFromRemote(1);
 
@@ -467,16 +462,15 @@
 
     try
     {
-      SortedSet<String> servers = new TreeSet<String>();
-      servers.add(HOST1 + SENDERPORT);
-      servers.add(HOST2 + RECEIVERPORT);
+      SortedSet<String> servers =
+          newSortedSet(HOST1 + SENDERPORT, HOST2 + RECEIVERPORT);
 
       replServer = createReplicationServer(replServerID, SENDERPORT,
           "ReplicationDomainTestDb", 100, servers);
 
       BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
       domain1 = new FakeStressReplicationDomain(
-          testService, 2, servers, 100, 1000, rcvQueue1);
+          testService, 2, servers, 1000, rcvQueue1);
 
       System.out.println("waiting");
       Thread.sleep(1000000000);
@@ -501,16 +495,15 @@
 
     try
     {
-      SortedSet<String> servers = new TreeSet<String>();
-      servers.add(HOST1 + SENDERPORT);
-      servers.add(HOST2 + RECEIVERPORT);
+      SortedSet<String> servers =
+          newSortedSet(HOST1 + SENDERPORT, HOST2 + RECEIVERPORT);
 
       replServer = createReplicationServer(replServerID, RECEIVERPORT,
           "ReplicationDomainTestDb", 100, servers);
 
       BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
       domain1 = new FakeStressReplicationDomain(
-          testService, 1, servers, 100, 100000, rcvQueue1);
+          testService, 1, servers, 100000, rcvQueue1);
       /*
        * Trigger a total update from domain1 to domain2.
        * Check that the exported data is correctly received on domain2.

--
Gitblit v1.10.0