From cb1bb5d131addd27e2927ec90cc572a8c4d40f80 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 09 Jan 2014 09:52:23 +0000
Subject: [PATCH] Front-port of r10098.

---
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java             |  126 ++----
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |  256 +++++-------
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java    |   26 
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java       |   27 
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |  100 ----
 opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java                                    |   52 +-
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |  503 ++++++++++++------------
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java |   22 
 opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java                                               |   21 
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java       |   14 
 10 files changed, 470 insertions(+), 677 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java b/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
index fcc036d..fb42a43 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,12 +22,11 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.common;
 
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 /**
  * This class holds information about a DS connected to the topology. This
@@ -35,8 +34,7 @@
  * messages, to keep every member (RS or DS) of the topology aware of the DS
  * topology.
  * <p>
- * This class is almost immutable, because it does not copy the List and Set
- * passed into the ctor.
+ * @Immutable
  */
 public final class DSInfo
 {
@@ -68,7 +66,6 @@
   private final Set<String> eclIncludesForDeletes;
 
 
-
   /**
    * Creates a new instance of DSInfo with every given info.
    *
@@ -101,8 +98,8 @@
   public DSInfo(int dsId, String dsUrl, int rsId, long generationId,
       ServerStatus status, boolean assuredFlag,
       AssuredMode assuredMode, byte safeDataLevel, byte groupId,
-      List<String> refUrls, Set<String> eclIncludes,
-      Set<String> eclIncludesForDeletes, short protocolVersion)
+      Collection<String> refUrls, Collection<String> eclIncludes,
+      Collection<String> eclIncludesForDeletes, short protocolVersion)
   {
     this.dsId = dsId;
     this.dsUrl = dsUrl;
@@ -113,9 +110,11 @@
     this.assuredMode = assuredMode;
     this.safeDataLevel = safeDataLevel;
     this.groupId = groupId;
-    this.refUrls = refUrls;
-    this.eclIncludes = eclIncludes;
-    this.eclIncludesForDeletes = eclIncludesForDeletes;
+    this.refUrls = Collections.unmodifiableList(new ArrayList<String>(refUrls));
+    this.eclIncludes =
+        Collections.unmodifiableSet(new HashSet<String>(eclIncludes));
+    this.eclIncludesForDeletes =
+        Collections.unmodifiableSet(new HashSet<String>(eclIncludesForDeletes));
     this.protocolVersion = protocolVersion;
   }
 
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 596b373..8db4b9c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -77,7 +77,6 @@
 import static org.opends.messages.ToolMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.AssuredMode.*;
 import static org.opends.server.replication.plugin.EntryHistorical.*;
 import static org.opends.server.replication.protocol.OperationContext.*;
 import static org.opends.server.replication.service.ReplicationMonitor.*;
@@ -186,7 +185,6 @@
   private final PersistentServerState state;
   private int numReplayedPostOpCalled = 0;
 
-  private volatile long generationId = -1;
   private volatile boolean generationIdSavedStatus = false;
 
   private final CSNGenerator generator;
@@ -227,7 +225,6 @@
   private final SortedMap<CSN, FakeOperation> replayOperations =
     new TreeMap<CSN, FakeOperation>();
 
-  private ReplicationDomainCfg config;
   private ExternalChangelogDomain eclDomain;
 
   /**
@@ -471,11 +468,8 @@
   public LDAPReplicationDomain(ReplicationDomainCfg configuration,
       BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
   {
-    super(configuration.getBaseDN(),
-          configuration.getServerId(),
-          configuration.getInitializationWindowSize());
+    super(configuration, -1);
 
-    this.config = configuration;
     this.updateToReplayQueue = updateToReplayQueue;
 
     // Get assured configuration
@@ -484,12 +478,7 @@
     // Get fractional configuration
     fractionalConfig = new FractionalConfig(getBaseDN());
     readFractionalConfig(configuration, false);
-
-    setGroupId((byte)configuration.getGroupId());
-    setURLs(configuration.getReferralsUrl());
-
     storeECLConfiguration(configuration);
-
     solveConflictFlag = isSolveConflict(configuration);
 
     Backend backend = retrievesBackend(getBaseDN());
@@ -551,76 +540,6 @@
   }
 
   /**
-   * Gets and stores the assured replication configuration parameters. Returns
-   * a boolean indicating if the passed configuration has changed compared to
-   * previous values and the changes require a reconnection.
-   * @param configuration The configuration object
-   * @param allowReconnection Tells if one must reconnect if significant changes
-   *        occurred
-   */
-  private void readAssuredConfig(ReplicationDomainCfg configuration,
-    boolean allowReconnection)
-  {
-    final boolean needReconnection = needReconnection(configuration);
-
-    // Disconnect if required: changing configuration values before
-    // disconnection would make assured replication used immediately and
-    // disconnection could cause some timeouts error.
-    if (needReconnection && allowReconnection)
-      disableService();
-
-    switch (configuration.getAssuredType())
-    {
-    case NOT_ASSURED:
-      setAssured(false);
-      break;
-    case SAFE_DATA:
-      setAssured(true);
-      setAssuredMode(AssuredMode.SAFE_DATA_MODE);
-      break;
-    case SAFE_READ:
-      setAssured(true);
-      setAssuredMode(AssuredMode.SAFE_READ_MODE);
-      break;
-    }
-    setAssuredSdLevel((byte) configuration.getAssuredSdLevel());
-    setAssuredTimeout(configuration.getAssuredTimeout());
-
-    // Reconnect if required
-    if (needReconnection && allowReconnection)
-      enableService();
-  }
-
-  private boolean needReconnection(ReplicationDomainCfg cfg)
-  {
-    switch (cfg.getAssuredType())
-    {
-    case NOT_ASSURED:
-      if (isAssured())
-      {
-        return true;
-      }
-      break;
-    case SAFE_DATA:
-      if (!isAssured() || getAssuredMode() == SAFE_READ_MODE)
-      {
-        return true;
-      }
-      break;
-    case SAFE_READ:
-      if (!isAssured() || getAssuredMode() == SAFE_DATA_MODE)
-      {
-        return true;
-      }
-      break;
-    }
-
-    return isAssured()
-        && getAssuredMode() == SAFE_DATA_MODE
-        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
-  }
-
-  /**
    * Sets the error message id to be used when online import is stopped with
    * error by the fractional replication ldif import plugin.
    * @param importErrorMessageId The message to use.
@@ -686,7 +605,8 @@
     }
 
     // Disable service if configuration changed
-    if (needReconnection && allowReconnection)
+    final boolean needRestart = needReconnection && allowReconnection;
+    if (needRestart)
     {
       disableService();
     }
@@ -713,7 +633,7 @@
     }
 
     // Reconnect if required
-    if (needReconnection && allowReconnection)
+    if (needRestart)
       enableService();
   }
 
@@ -1622,7 +1542,7 @@
 
     // FIXME should the next call use the initWindow parameter rather than the
     // instance variable?
-    super.initializeRemote(target, requestorID, initTask, this.initWindow);
+    super.initializeRemote(target, requestorID, initTask, getInitWindow());
   }
 
   /**
@@ -2377,7 +2297,7 @@
       DirectoryServer.deregisterAlertGenerator(this);
 
       // stop the ReplicationDomain
-      stopDomain();
+      disableService();
     }
 
     // wait for completion of the persistentServerState thread.
@@ -3412,14 +3332,6 @@
     return genId;
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public long getGenerationID()
-  {
-    return generationId;
-  }
-
-
   /**
    * Run a modify operation to update the entry whose DN is given as
    * a parameter with the generationID information.
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
index 9b83896..87913ce 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -22,17 +22,14 @@
  *
  *
  *      Copyright 2008-2009 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.protocol;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.protocols.asn1.ASN1;
@@ -43,6 +40,7 @@
 import org.opends.server.types.ByteSequenceReader;
 import org.opends.server.types.ByteString;
 import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
 
 /**
  * This message is used by DS to confirm a RS he wants to connect to him (open
@@ -60,19 +58,18 @@
  */
 public class StartSessionMsg extends ReplicationMsg
 {
-  // The list of referrals URLs to the sending DS
-  private List<String> referralsURLs = new ArrayList<String>();
-  // The initial status the DS starts with
+  /** The list of referrals URLs to the sending DS. */
+  private final List<String> referralsURLs = new ArrayList<String>();
+  /** The initial status the DS starts with. */
   private ServerStatus status = ServerStatus.INVALID_STATUS;
-  // Assured replication enabled on DS or not
-  private boolean assuredFlag = false;
-  // DS assured mode (relevant if assured replication enabled)
+  /** Assured replication enabled on DS or not. */
+  private boolean assuredFlag;
+  /** DS assured mode (relevant if assured replication enabled). */
   private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
-  // DS safe data level (relevant if assured mode is safe data)
-  private byte safeDataLevel = (byte) 1;
+  /** DS safe data level (relevant if assured mode is safe data). */
+  private byte safeDataLevel = 1;
 
   private Set<String> eclIncludes = new HashSet<String>();
-
   private Set<String> eclIncludesForDeletes = new HashSet<String>();
 
   /**
@@ -103,10 +100,10 @@
    * @param assuredMode Assured type
    * @param safeDataLevel Assured mode safe data level
    */
-  public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
+  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs,
     boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel)
   {
-    this.referralsURLs = referralsURLs;
+    this.referralsURLs.addAll(referralsURLs);
     this.status = status;
     this.assuredFlag = assuredFlag;
     this.assuredMode = assuredMode;
@@ -119,9 +116,9 @@
    * @param status Status we are starting with
    * @param referralsURLs Referrals URLs to be used by peer DSs
    */
-  public StartSessionMsg(ServerStatus status, List<String> referralsURLs)
+  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs)
   {
-    this.referralsURLs = referralsURLs;
+    this.referralsURLs.addAll(referralsURLs);
     this.status = status;
     this.assuredFlag = false;
   }
@@ -130,9 +127,7 @@
   // Msg encoding
   // ============
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public byte[] getBytes(short reqProtocolVersion)
     throws UnsupportedEncodingException
@@ -334,7 +329,6 @@
 
       /* Read the referrals URLs */
       int pos = 5;
-      referralsURLs = new ArrayList<String>();
       while (pos < in.length)
       {
         /*
@@ -373,24 +367,18 @@
     return status;
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public String toString()
   {
-    String urls = "";
-    for (String s : referralsURLs)
-    {
-      urls += s + " | ";
-    }
-    return ("StartSessionMsg content:\nstatus: " + status +
+    String urls = StaticUtils.collectionToString(referralsURLs, " | ");
+    return "StartSessionMsg content:\nstatus: " + status +
       "\nassuredFlag: " + assuredFlag +
       "\nassuredMode: " + assuredMode +
       "\nsafeDataLevel: " + safeDataLevel +
       "\nreferralsURLs: " + urls +
       "\nEclIncludes " + eclIncludes +
-      "\nEclIncludeForDeletes: " + eclIncludesForDeletes);
+      "\nEclIncludeForDeletes: " + eclIncludesForDeletes;
   }
 
   /**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index b01feaa..7f41ec9 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -39,6 +39,7 @@
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.task.Task;
@@ -56,6 +57,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.AssuredMode.*;
 import static org.opends.server.replication.common.StatusMachine.*;
 
 /**
@@ -100,7 +102,7 @@
  *   implementation using methods {@link #initializeRemote(int)}
  *   or {@link #initializeFromRemote(int)}.
  * <p>
- *   At shutdown time, the {@link #stopDomain()} method should be called to
+ *   At shutdown time, the {@link #disableService()} method should be called to
  *   cleanly stop the replication service.
  */
 public abstract class ReplicationDomain
@@ -115,25 +117,21 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /** The configuration of the replication domain. */
+  protected volatile ReplicationDomainCfg config;
   /**
-   *  The baseDN for the Replication Service.
-   *  All Replication Domain using this baseDN will be connected
-   *  through the Replication Service.
+   * The assured configuration of the replication domain. It is a duplicate of
+   * {@link #config} because of its update model.
+   *
+   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
    */
-  private final DN baseDN;
-
-  /**
-   * The identifier of this Replication Domain inside the
-   * Replication Service.
-   * Each Domain must use a unique ServerID.
-   */
-  private final int serverID;
+  private volatile ReplicationDomainCfg assuredConfig;
 
   /**
    * The ReplicationBroker that is used by this ReplicationDomain to
    * connect to the ReplicationService.
    */
-  protected ReplicationBroker broker = null;
+  protected ReplicationBroker broker;
 
   /**
    * This Map is used to store all outgoing assured messages in order
@@ -158,33 +156,6 @@
   private volatile DirectoryThread listenerThread = null;
 
   /**
-   * A Map used to store all the ReplicationDomains created on this server.
-   */
-  private static Map<DN, ReplicationDomain> domains =
-      new HashMap<DN, ReplicationDomain>();
-
-  /*
-   * Assured mode properties
-   */
-  /** Whether assured mode is enabled for this domain. */
-  private boolean assured = false;
-  /** Assured sub mode (used when assured is true). */
-  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
-  /** Safe Data level (used when assuredMode is SAFE_DATA). */
-  private byte assuredSdLevel = 1;
-  /** The timeout in ms that should be used, when waiting for assured acks. */
-  private long assuredTimeout = 2000;
-
-  /** Group id. */
-  private byte groupId = 1;
-  /**
-   * Referrals urls to be published to other servers of the topology.
-   * <p>
-   * TODO: fill that with all currently opened urls if no urls configured
-   */
-  private final List<String> refUrls = new ArrayList<String>();
-
-  /**
    * A set of counters used for Monitoring.
    */
   private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
@@ -265,15 +236,6 @@
   private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
     new HashMap<Integer,Integer>();
 
-  /**
-   * Window size used during initialization .. between
-   * - the initializer/exporter DS that listens/waits acknowledges and that
-   *   slows down data msg publishing based on the slowest server
-   * - and each initialized/importer DS that publishes acknowledges each
-   *   WINDOW/2 data msg received.
-   */
-  protected final int initWindow;
-
   /* Status related monitoring fields */
 
   /**
@@ -311,6 +273,12 @@
   private final Object sessionLock = new Object();
 
   /**
+   * The generationId for this replication domain. It is made of a hash of the
+   * 1000 first entries for this domain.
+   */
+  protected volatile long generationId;
+
+  /**
    * Returns the {@link CSNGenerator} that will be used to
    * generate {@link CSN} for this domain.
    *
@@ -325,46 +293,39 @@
   /**
    * Creates a ReplicationDomain with the provided parameters.
    *
-   * @param baseDN     The identifier of the Replication Domain to which
-   *                   this object is participating.
-   * @param serverID   The identifier of the server that is participating
-   *                   to the Replication Domain.
-   *                   This identifier should be different for each server that
-   *                   is participating to a given Replication Domain.
-   * @param initWindow Window used during initialization.
+   * @param config
+   *          The configuration object for this ReplicationDomain
+   * @param generationId
+   *          the generation of this ReplicationDomain
    */
-  public ReplicationDomain(DN baseDN, int serverID, int initWindow)
+  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
   {
-    this.baseDN = baseDN;
-    this.serverID = serverID;
-    this.initWindow = initWindow;
+    this.config = config;
+    this.assuredConfig = config;
+    this.generationId = generationId;
     this.state = new ServerState();
-    this.generator = new CSNGenerator(serverID, state);
-
-    domains.put(baseDN, this);
+    this.generator = new CSNGenerator(getServerId(), state);
   }
 
   /**
-   * Creates a ReplicationDomain with the provided parameters.
-   * (for unit test purpose only)
+   * Creates a ReplicationDomain with the provided parameters. (for unit test
+   * purpose only)
    *
-   * @param baseDN     The identifier of the Replication Domain to which
-   *                   this object is participating.
-   * @param serverID   The identifier of the server that is participating
-   *                   to the Replication Domain.
-   *                   This identifier should be different for each server that
-   *                   is participating to a given Replication Domain.
-   * @param serverState The serverState to use
+   * @param config
+   *          The configuration object for this ReplicationDomain
+   * @param generationId
+   *          the generation of this ReplicationDomain
+   * @param serverState
+   *          The serverState to use
    */
-  public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
+  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
+      ServerState serverState)
   {
-    this.baseDN = baseDN;
-    this.serverID = serverID;
-    this.initWindow = 100;
+    this.config = config;
+    this.assuredConfig = config;
+    this.generationId = generationId;
     this.state = serverState;
-    this.generator = new CSNGenerator(serverID, state);
-
-    domains.put(baseDN, this);
+    this.generator = new CSNGenerator(getServerId(), state);
   }
 
   /**
@@ -387,7 +348,7 @@
     if (!isValidInitialStatus(initStatus))
     {
       logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
-          getBaseDNString(), Integer.toString(serverID)));
+          getBaseDNString(), Integer.toString(getServerId())));
     }
     else
     {
@@ -406,7 +367,7 @@
   private void receiveChangeStatus(ChangeStatusMsg csMsg)
   {
     if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDN +
+      TRACER.debugInfo("Replication domain " + getBaseDN() +
         " received change status message:\n" + csMsg);
 
     ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -416,7 +377,7 @@
     if (event == StatusMachineEvent.INVALID_EVENT)
     {
       logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
-          getBaseDNString(), Integer.toString(serverID)));
+          getBaseDNString(), Integer.toString(getServerId())));
       return;
     }
 
@@ -468,13 +429,24 @@
   }
 
   /**
-   * Returns the base DN of this ReplicationDomain.
+   * Returns the current config of this ReplicationDomain.
+   *
+   * @return the config
+   */
+  protected ReplicationDomainCfg getConfig()
+  {
+    return config;
+  }
+
+  /**
+   * Returns the base DN of this ReplicationDomain. All Replication Domain using
+   * this baseDN will be connected through the Replication Service.
    *
    * @return The base DN of this ReplicationDomain
    */
   public DN getBaseDN()
   {
-    return baseDN;
+    return config.getBaseDN();
   }
 
   /**
@@ -484,16 +456,32 @@
    */
   public String getBaseDNString()
   {
-    return baseDN.toNormalizedString();
+    return getBaseDN().toNormalizedString();
   }
 
   /**
-   * Get the server ID.
+   * Get the server ID. The identifier of this Replication Domain inside the
+   * Replication Service. Each Domain must use a unique ServerID.
+   *
    * @return The server ID.
    */
   public int getServerId()
   {
-    return serverID;
+    return config.getServerId();
+  }
+
+  /**
+   * Window size used during initialization .. between - the
+   * initializer/exporter DS that listens/waits acknowledges and that slows down
+   * data msg publishing based on the slowest server - and each
+   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
+   * received.
+   *
+   * @return the initWindow
+   */
+  protected int getInitWindow()
+  {
+    return config.getInitializationWindowSize();
   }
 
   /**
@@ -502,25 +490,38 @@
    */
   public boolean isAssured()
   {
-    return assured;
+    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
+        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
   }
 
   /**
-   * Gives the mode for the assured replication of the domain.
+   * Gives the mode for the assured replication of the domain. Only used when
+   * assured is true).
+   *
    * @return The mode for the assured replication of the domain.
    */
   public AssuredMode getAssuredMode()
   {
-    return assuredMode;
+    switch (assuredConfig.getAssuredType())
+    {
+    case SAFE_DATA:
+    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
+      return AssuredMode.SAFE_DATA_MODE;
+    case SAFE_READ:
+      return AssuredMode.SAFE_READ_MODE;
+    }
+    return null; // should never happen
   }
 
   /**
-   * Gives the assured level of the replication of the domain.
+   * Gives the assured Safe Data level of the replication of the domain. (used
+   * when assuredMode is SAFE_DATA).
+   *
    * @return The assured level of the replication of the domain.
    */
   public byte getAssuredSdLevel()
   {
-    return assuredSdLevel;
+    return (byte) assuredConfig.getAssuredSdLevel();
   }
 
   /**
@@ -529,7 +530,7 @@
    */
   public long getAssuredTimeout()
   {
-    return assuredTimeout;
+    return assuredConfig.getAssuredTimeout();
   }
 
   /**
@@ -538,16 +539,20 @@
    */
   public byte getGroupId()
   {
-    return groupId;
+    return (byte) config.getGroupId();
   }
 
   /**
-   * Gets the referrals URLs this domain publishes.
+   * Gets the referrals URLs this domain publishes. Referrals urls to be
+   * published to other servers of the topology.
+   * <p>
+   * TODO: fill that with all currently opened urls if no urls configured
+   *
    * @return The referrals URLs this domain publishes.
    */
-  public List<String> getRefUrls()
+  public Set<String> getRefUrls()
   {
-    return refUrls;
+    return config.getReferralsUrl();
   }
 
   /**
@@ -673,67 +678,6 @@
   }
 
   /**
-   * Set the list of Referrals that should be returned when an
-   * operation needs to be redirected to this server.
-   *
-   * @param referralsUrl The list of referrals.
-   */
-  public void setURLs(Set<String> referralsUrl)
-  {
-      this.refUrls.addAll(referralsUrl);
-  }
-
-  /**
-   * Set the timeout of the assured replication.
-   *
-   * @param assuredTimeout the timeout of the assured replication.
-   */
-  public void setAssuredTimeout(long assuredTimeout)
-  {
-    this.assuredTimeout = assuredTimeout;
-  }
-
-  /**
-   * Sets the groupID.
-   *
-   * @param groupId The groupID.
-   */
-  public void setGroupId(byte groupId)
-  {
-    this.groupId = groupId;
-  }
-
-  /**
-   * Sets the level of assured replication.
-   *
-   * @param assuredSdLevel The level of assured replication.
-   */
-  public void setAssuredSdLevel(byte assuredSdLevel)
-  {
-    this.assuredSdLevel = assuredSdLevel;
-  }
-
-  /**
-   * Sets the assured replication mode.
-   *
-   * @param dataMode The assured replication mode.
-   */
-  public void setAssuredMode(AssuredMode dataMode)
-  {
-    this.assuredMode = dataMode;
-  }
-
-  /**
-   * Sets assured replication.
-   *
-   * @param assured A boolean indicating if assured replication should be used.
-   */
-  public void setAssured(boolean assured)
-  {
-    this.assured = assured;
-  }
-
-  /**
    * Receives an update message from the replicationServer.
    * The other types of messages are processed in an opaque way for the caller.
    * Also responsible for updating the list of pending changes
@@ -802,8 +746,8 @@
             */
             if (debugEnabled())
               TRACER.debugInfo(
-                  "[IE] processErrorMsg:" + this.serverID +
-                  " baseDN: " + this.baseDN +
+                  "[IE] processErrorMsg:" + getServerId() +
+                  " baseDN: " + getBaseDN() +
                   " Error Msg received: " + errorMsg);
 
             if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -873,10 +817,9 @@
     }
 
     numRcvdUpdates.incrementAndGet();
-     byte rsGroupId = broker.getRsGroupId();
     if (update.isAssured()
-        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
-        && rsGroupId == groupId)
+        && broker.getRsGroupId() == getGroupId()
+        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
     {
       assuredSrReceivedUpdates.incrementAndGet();
     }
@@ -949,7 +892,7 @@
         requested servers. Log problem
         */
         logError(NOTE_DS_RECEIVED_ACK_ERROR.get(
-            getBaseDNString(), Integer.toString(serverID),
+            getBaseDNString(), Integer.toString(getServerId()),
             update.toString(), ack.errorsToString()));
 
         List<Integer> failedServers = ack.getFailedServers();
@@ -1048,7 +991,7 @@
      */
     public ExportThread(int serverIdToInitialize, int initWindow)
     {
-      super("Export thread from serverId=" + serverID + " to serverId="
+      super("Export thread from serverId=" + getServerId() + " to serverId="
           + serverIdToInitialize);
       this.serverIdToInitialize = serverIdToInitialize;
       this.initWindow = initWindow;
@@ -1379,7 +1322,7 @@
   public void initializeRemote(int target, Task initTask)
   throws DirectoryException
   {
-    initializeRemote(target, this.serverID, initTask, this.initWindow);
+    initializeRemote(target, getServerId(), initTask, getInitWindow());
   }
 
   /**
@@ -1418,7 +1361,7 @@
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
       logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
-          countEntries(), getBaseDNString(), serverID));
+          countEntries(), getBaseDNString(), getServerId()));
 
       for (DSInfo dsi : getReplicasList())
       {
@@ -1436,8 +1379,8 @@
     }
     else
     {
-      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
-          countEntries(), getBaseDNString(), serverID, serverToInitialize));
+      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
+          getBaseDNString(), getServerId(), serverToInitialize));
 
       ieContext.startList.add(serverToInitialize);
 
@@ -1471,7 +1414,7 @@
 
         // Send start message to the peer
         InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
-            getBaseDN(), serverID, serverToInitialize,
+            getBaseDN(), getServerId(), serverToInitialize,
             serverRunningTheTask, ieContext.entryCount, initWindow);
 
         broker.publish(initTargetMsg);
@@ -1492,8 +1435,8 @@
         exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
 
         // Notify the peer of the success
-        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
-        broker.publish(doneMsg);
+        broker.publish(
+            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
       }
       catch(DirectoryException exportException)
       {
@@ -1595,12 +1538,12 @@
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
       logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
-          getBaseDNString(), serverID, cause));
+          getBaseDNString(), getServerId(), cause));
     }
     else
     {
       logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
-          getBaseDNString(), serverID, serverToInitialize, cause));
+          getBaseDNString(), getServerId(), serverToInitialize, cause));
     }
 
 
@@ -1894,10 +1837,8 @@
             // send the ack of flow control mgmt
             if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
             {
-              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
-                  this.serverID,
-                  entryMsg.getSenderID(),
-                  ieContext.msgCnt);
+              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
+                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
               broker.publish(amsg, false);
               if (debugEnabled())
               {
@@ -1945,7 +1886,7 @@
               Message.raw(Category.SYNC, Severity.NOTICE,
                   ERR_INIT_EXPORTER_DISCONNECTION.get(
                       getBaseDNString(),
-                      Integer.toString(this.serverID),
+                      Integer.toString(getServerId()),
                       Integer.toString(ieContext.importSource)));
             ieContext.setExceptionIfNoneSet(new DirectoryException(
                 ResultCode.OTHER, errMsg));
@@ -2017,7 +1958,7 @@
 
     // build the message
     EntryMsg entryMessage = new EntryMsg(
-        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
+        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
         ++ieContext.msgCnt);
 
     // Waiting the slowest loop
@@ -2219,7 +2160,7 @@
       ieContext.initializeTask = initTask;
       ieContext.attemptCnt = 0;
       ieContext.initReqMsgSent = new InitializeRequestMsg(
-          getBaseDN(), serverID, source, this.initWindow);
+          getBaseDN(), getServerId(), source, getInitWindow());
 
       // Publish Init request msg
       broker.publish(ieContext.initReqMsgSent);
@@ -2281,14 +2222,14 @@
     try
     {
       // Log starting
-      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
-          getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID));
+      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(),
+          initTargetMsgReceived.getSenderID(), getServerId()));
 
       // Go into full update status
       setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
 
       // Acquire an import context if no already done (and initialize).
-      if (initTargetMsgReceived.getInitiatorID() != this.serverID)
+      if (initTargetMsgReceived.getInitiatorID() != getServerId())
       {
         /*
         The initTargetMsgReceived is for an import initiated by the remote
@@ -2418,7 +2359,8 @@
       finally
       {
         Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
-            getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
+            getBaseDNString(), initTargetMsgReceived.getSenderID(),
+            getServerId(),
             (ieContext.getException() == null ? ""
                 : ieContext.getException().getLocalizedMessage()));
         logError(msg);
@@ -2458,7 +2400,7 @@
     if (newStatus == ServerStatus.INVALID_STATUS)
     {
       logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
-          Integer.toString(serverID), status.toString(), event.toString()));
+          String.valueOf(getServerId()), status.toString(), event.toString()));
       return;
     }
 
@@ -2472,13 +2414,11 @@
         resetMonitoringCounters();
       }
 
-      // Store new status
       status = newStatus;
-
       if (debugEnabled())
       {
-        TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
-            + status);
+        TRACER.debugInfo("Replication domain " + getBaseDN()
+            + " new status is: " + status);
       }
 
       // Perform whatever actions are needed to apply properties for being
@@ -2560,10 +2500,8 @@
     // check that at least one ReplicationServer did change its generation-id
     checkGenerationID(-1);
 
-    // Reconnect to the Replication Server so that it adopt our
-    // GenerationID.
-    disableService();
-    enableService();
+    // Reconnect to the Replication Server so that it adopts our GenerationID.
+    restartService();
 
     // wait for the domain to reconnect.
     int count = 0;
@@ -2597,8 +2535,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
-          + " resetGenerationId " + generationIdNewValue);
+      TRACER.debugInfo("Server id " + getServerId() + " and domain "
+          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
     }
 
     ResetGenerationIdMsg genIdMessage =
@@ -2607,7 +2545,7 @@
     if (!isConnected())
     {
       Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
-          Integer.toString(serverID),
+          Integer.toString(getServerId()),
           Long.toString(genIdMessage.getGenerationId()));
       throw new DirectoryException(ResultCode.OTHER, message);
     }
@@ -3110,33 +3048,19 @@
   }
 
   /**
-   * Definitively stops the Replication Service.
-   */
-  public void stopDomain()
-  {
-    disableService();
-    domains.remove(baseDN);
-  }
-
-  /**
    * Change some ReplicationDomain parameters.
    *
    * @param config
    *          The new configuration that this domain should now use.
    */
-  public void changeConfig(ReplicationDomainCfg config)
+  protected void changeConfig(ReplicationDomainCfg config)
   {
-    this.groupId = (byte) config.getGroupId();
-
     if (broker != null && broker.changeConfig(config))
     {
-      disableService();
-      enableService();
+      restartService();
     }
   }
 
-
-
   /**
    * Applies a configuration change to the attributes which should be be
    * included in the ECL.
@@ -3149,15 +3073,19 @@
   public void changeConfig(Set<String> includeAttributes,
       Set<String> includeAttributesForDeletes)
   {
-    if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
-        && broker != null)
+    final boolean attrsModified = setEclIncludes(
+        getServerId(), includeAttributes, includeAttributesForDeletes);
+    if (attrsModified && broker != null)
     {
-      disableService();
-      enableService();
+      restartService();
     }
   }
 
-
+  private void restartService()
+  {
+    disableService();
+    enableService();
+  }
 
   /**
    * This method should trigger an export of the replicated data.
@@ -3236,15 +3164,13 @@
     Send an ack if it was requested and the group id is the same of the RS
     one. Only Safe Read mode makes sense in DS for returning an ack.
     */
-    byte rsGroupId = broker.getRsGroupId();
     // Assured feature is supported starting from replication protocol V2
     if (msg.isAssured()
       && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
     {
-      AssuredMode msgAssuredMode = msg.getAssuredMode();
-      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
       {
-        if (rsGroupId == groupId)
+        if (broker.getRsGroupId() == getGroupId())
         {
           // Send the ack
           AckMsg ackMsg = new AckMsg(msg.getCSN());
@@ -3255,7 +3181,7 @@
             ackMsg.setHasReplayError(true);
             //   -> replay error occurred in our server
             List<Integer> idList = new ArrayList<Integer>();
-            idList.add(serverID);
+            idList.add(getServerId());
             ackMsg.setFailedServers(idList);
           }
           broker.publish(ackMsg);
@@ -3269,10 +3195,11 @@
           }
         }
       }
-      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
       {
-        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
-            msgAssuredMode.toString(), getBaseDNString(), msg.toString()));
+        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()),
+            msg.getAssuredMode().toString(), getBaseDNString(),
+            msg.toString()));
       }
         // Nothing to do in Assured safe data mode, only RS ack updates.
     }
@@ -3303,23 +3230,22 @@
    */
   protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
   {
-    byte rsGroupId = broker.getRsGroupId();
     /*
      * If assured configured, set message accordingly to request an ack in the
      * right assured mode.
-     * No ack requested for a RS with a different group id. Assured
-     * replication supported for the same locality, i.e: a topology working in
-     * the same
-     * geographical location). If we are connected to a RS which is not in our
-     * locality, no need to ask for an ack.
+     * No ack requested for a RS with a different group id.
+     * Assured replication supported for the same locality,
+     * i.e: a topology working in the same geographical location).
+     * If we are connected to a RS which is not in our locality,
+     * no need to ask for an ack.
      */
-    if (assured && rsGroupId == groupId)
+    if (needsAck())
     {
       msg.setAssured(true);
-      msg.setAssuredMode(assuredMode);
-      if (assuredMode == AssuredMode.SAFE_DATA_MODE)
+      msg.setAssuredMode(getAssuredMode());
+      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
       {
-        msg.setSafeDataLevel(assuredSdLevel);
+        msg.setSafeDataLevel(getAssuredSdLevel());
       }
 
       // Add the assured message to the list of update that are waiting for acks
@@ -3327,6 +3253,11 @@
     }
   }
 
+  private boolean needsAck()
+  {
+    return isAssured() && broker.getRsGroupId() == getGroupId();
+  }
+
   /**
    * Wait for the processing of an assured message after it has been sent, if
    * assured replication is configured, otherwise, do nothing.
@@ -3340,14 +3271,10 @@
   protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
     throws TimeoutException
   {
-    byte rsGroupId = broker.getRsGroupId();
-
-    // If assured mode configured, wait for acknowledgment for the just sent
-    // message
-    if (assured && rsGroupId == groupId)
+    if (needsAck())
     {
       // Increment assured replication monitoring counters
-      switch (assuredMode)
+      switch (getAssuredMode())
       {
         case SAFE_READ_MODE:
           assuredSrSentUpdates.incrementAndGet();
@@ -3383,12 +3310,12 @@
           if (debugEnabled())
           {
             TRACER.debugInfo("waitForAck method interrupted for replication " +
-              "baseDN: " + baseDN);
+              "baseDN: " + getBaseDN());
           }
           break;
         }
         // Timeout ?
-        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+        if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout())
         {
           /*
           Timeout occurred, be sure that ack is not being received and if so,
@@ -3424,8 +3351,8 @@
           }
 
           throw new TimeoutException("No ack received for message csn: " + csn
-              + " and replication domain: " + baseDN + " after "
-              + assuredTimeout + " ms.");
+              + " and replication domain: " + getBaseDN() + " after "
+              + getAssuredTimeout() + " ms.");
         }
       }
     }
@@ -3481,7 +3408,7 @@
     {
       // This exception may only be raised if assured replication is enabled
       logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
-          Long.toString(assuredTimeout), update.toString()));
+          Long.toString(getAssuredTimeout()), update.toString()));
     }
   }
 
@@ -3493,11 +3420,25 @@
    *
    * @return The GenerationID.
    */
-  public abstract long getGenerationID();
+  public long getGenerationID()
+  {
+    return generationId;
+  }
 
   /**
-   * Subclasses should use this method to add additional monitoring
-   * information in the ReplicationDomain.
+   * Sets the generationId for this replication domain.
+   *
+   * @param generationId
+   *          the generationId to set
+   */
+  public void setGenerationID(long generationId)
+  {
+    this.generationId = generationId;
+  }
+
+  /**
+   * Subclasses should use this method to add additional monitoring information
+   * in the ReplicationDomain.
    *
    * @return Additional monitoring attributes that will be added in the
    *         ReplicationDomain monitoring entry.
@@ -3711,13 +3652,69 @@
    */
   public CSN getLastLocalChange()
   {
-    return state.getCSN(serverID);
+    return state.getCSN(getServerId());
+  }
+
+  /**
+   * Gets and stores the assured replication configuration parameters. Returns a
+   * boolean indicating if the passed configuration has changed compared to
+   * previous values and the changes require a reconnection.
+   *
+   * @param config
+   *          The configuration object
+   * @param allowReconnection
+   *          Tells if one must reconnect if significant changes occurred
+   */
+  protected void readAssuredConfig(ReplicationDomainCfg config,
+      boolean allowReconnection)
+  {
+    // Disconnect if required: changing configuration values before
+    // disconnection would make assured replication used immediately and
+    // disconnection could cause some timeouts error.
+    if (needReconnection(config) && allowReconnection)
+    {
+      disableService();
+
+      assuredConfig = config;
+
+      enableService();
+    }
+  }
+
+  private boolean needReconnection(ReplicationDomainCfg cfg)
+  {
+    final AssuredMode assuredMode = getAssuredMode();
+    switch (cfg.getAssuredType())
+    {
+    case NOT_ASSURED:
+      if (isAssured())
+      {
+        return true;
+      }
+      break;
+    case SAFE_DATA:
+      if (!isAssured() || assuredMode == SAFE_READ_MODE)
+      {
+        return true;
+      }
+      break;
+    case SAFE_READ:
+      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
+      {
+        return true;
+      }
+      break;
+    }
+
+    return isAssured()
+        && assuredMode == SAFE_DATA_MODE
+        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
   }
 
   /** {@inheritDoc} */
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
+    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
   }
 }
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index 2a9b6c1..cae6348 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -20,13 +20,14 @@
  *
  * CDDL HEADER END
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
@@ -39,12 +40,9 @@
 public class DummyReplicationDomain extends ReplicationDomain
 {
 
-  private final long generationId;
-
   public DummyReplicationDomain(long generationId)
   {
-    super(null, -1, 0);
-    this.generationId = generationId;
+    super(new DomainFakeCfg(null, -1, new TreeSet<String>()), generationId);
   }
 
   @Override
@@ -92,10 +90,4 @@
     return false;
   }
 
-  @Override
-  public long getGenerationID()
-  {
-    return this.generationId;
-  }
-
 }
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index bc13f0f..6028476 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -538,6 +538,16 @@
     replicationServer = new ReplicationServer(conf);
   }
 
+  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
+      SortedSet<String> replicationServers, long heartbeatInterval)
+  {
+    DomainFakeCfg fakeCfg =
+        new DomainFakeCfg(baseDN, serverID, replicationServers);
+    fakeCfg.setHeartbeatInterval(heartbeatInterval);
+    fakeCfg.setChangetimeHeartbeatInterval(500);
+    return fakeCfg;
+  }
+
   /**
    * This class is the minimum implementation of a Concrete ReplicationDomain
    * used to be able to connect to the RS with a known genid. Also to be able
@@ -561,18 +571,14 @@
      */
     private StringBuilder importString;
     private int exportedEntryCount;
-    private long generationID = -1;
 
     public FakeReplicationDomain(DN baseDN, int serverID,
         SortedSet<String> replicationServers, long heartbeatInterval,
         long generationId) throws ConfigException
     {
-      super(baseDN, serverID, 100);
-      generationID = generationId;
-      DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
-      fakeCfg.setHeartbeatInterval(heartbeatInterval);
-      fakeCfg.setChangetimeHeartbeatInterval(500);
-      startPublishService(fakeCfg);
+      super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval),
+          generationId);
+      startPublishService(getConfig());
       startListenService();
     }
 
@@ -604,12 +610,6 @@
     }
 
     @Override
-    public long getGenerationID()
-    {
-      return generationID;
-    }
-
-    @Override
     protected void importBackend(InputStream input) throws DirectoryException
     {
       byte[] buffer = new byte[1000];
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 90ba002..38cba02 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -61,7 +61,7 @@
  */
 public class TopologyViewTest extends ReplicationTestCase
 {
-  // Server id definitions
+  /** Server id definitions */
   private static final int DS1_ID = 1;
   private static final int DS2_ID = 2;
   private static final int DS3_ID = 3;
@@ -72,7 +72,7 @@
   private static final int RS2_ID = 52;
   private static final int RS3_ID = 53;
 
-  // Group id definitions
+  /** Group id definitions */
   private static final int DS1_GID = 1;
   private static final int DS2_GID = 1;
   private static final int DS3_GID = 2;
@@ -83,7 +83,7 @@
   private static final int RS2_GID = 2;
   private static final int RS3_GID = 3;
 
-  // Assured conf definitions
+  /** Assured conf definitions */
   private static final AssuredType DS1_AT = AssuredType.NOT_ASSURED;
   private static final int DS1_SDL = -1;
   private static SortedSet<String> DS1_RU = new TreeSet<String>();
@@ -140,7 +140,7 @@
   private ReplicationServer rs2 = null;
   private ReplicationServer rs3 = null;
 
-  // The tracer object for the debug logger
+  /** The tracer object for the debug logger */
   private static final DebugTracer TRACER = getTracer();
 
   private void debugInfo(String s)
@@ -460,7 +460,7 @@
     return replicationDomain;
   }
 
-  // Definitions of steps for the test case
+  /** Definitions of steps for the test case */
   private static final int STEP_1 = 1;
   private static final int STEP_2 = 2;
   private static final int STEP_3 = 3;
@@ -824,17 +824,13 @@
       }
 
     // Perform necessary conversions
-    boolean assuredFlag = (assuredType != AssuredType.NOT_ASSURED);
-    AssuredMode assMode = ( (assuredType == AssuredType.SAFE_READ) ?
-      AssuredMode.SAFE_READ_MODE : AssuredMode.SAFE_DATA_MODE);
-    List<String> urls = new ArrayList<String>();
-    for(String str : refUrls)
-    {
-      urls.add(str);
-    }
+    boolean assuredFlag = assuredType != AssuredType.NOT_ASSURED;
+    AssuredMode assMode = assuredType == AssuredType.SAFE_READ
+        ? AssuredMode.SAFE_READ_MODE
+        : AssuredMode.SAFE_DATA_MODE;
 
     return new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
-       (byte)assuredSdLevel, groupId, urls, eclIncludes, eclIncludes, protocolVersion);
+       (byte)assuredSdLevel, groupId, refUrls, eclIncludes, eclIncludes, protocolVersion);
   }
 
   /**
@@ -1018,36 +1014,20 @@
      /**
       * Get the topo view of the current analyzed DS
       */
-     List<DSInfo> internalDsList = rd.getReplicasList();
      // Add info for DS itself:
      // we need to clone the list as we don't want to modify the list kept
      // inside the DS.
-     List<DSInfo> dsList = new ArrayList<DSInfo>();
-     for (DSInfo aDsInfo : internalDsList)
-     {
-       dsList.add(aDsInfo);
-     }
-     int dsId = rd.getServerId();
-     int rsId = rd.getRsServerId();
-     ServerStatus status = rd.getStatus();
-     boolean assuredFlag = rd.isAssured();
-     AssuredMode assuredMode = rd.getAssuredMode();
-     byte safeDataLevel = rd.getAssuredSdLevel();
-     byte groupId = rd.getGroupId();
-     List<String> refUrls = rd.getRefUrls();
-     Set<String> eclInclude = rd.getEclIncludes();
-     Set<String> eclIncludeForDeletes = rd.getEclIncludesForDeletes();
-     short protocolVersion = ProtocolVersion.getCurrentVersion();
-     DSInfo dsInfo = new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assuredMode,
-       safeDataLevel, groupId, refUrls, eclInclude, eclIncludeForDeletes, protocolVersion);
-     dsList.add(dsInfo);
+      final DSInfo dsInfo = new DSInfo(rd.getServerId(), "dummy:1234", rd.getRsServerId(),
+          TEST_DN_WITH_ROOT_ENTRY_GENID,
+          rd.getStatus(),
+          rd.isAssured(), rd.getAssuredMode(), rd.getAssuredSdLevel(),
+          rd.getGroupId(), rd.getRefUrls(),
+          rd.getEclIncludes(), rd.getEclIncludesForDeletes(),
+          ProtocolVersion.getCurrentVersion());
+      final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList());
+      dsList.add(dsInfo);
 
      TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
-
-     /**
-      * Compare to what is the expected view
-      */
-
      assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId);
    }
   }
@@ -1058,8 +1038,8 @@
    */
   private class TopoView
   {
-    private List<DSInfo> dsList = null;
-    private List<RSInfo> rsList = null;
+    private List<DSInfo> dsList;
+    private List<RSInfo> rsList;
 
     public TopoView(List<DSInfo> dsList, List<RSInfo> rsList)
     {
@@ -1073,20 +1053,23 @@
     @Override
     public boolean equals(Object obj)
     {
-      assertNotNull(obj);
-      assertFalse(obj.getClass() != this.getClass());
-
-      TopoView topoView = (TopoView) obj;
-
-      // Check dsList
-      if (topoView.dsList.size() != dsList.size())
+      if (obj == null || getClass() != obj.getClass())
         return false;
-      for (DSInfo dsInfo : topoView.dsList)
+      TopoView other = (TopoView) obj;
+      return checkLists(dsList, other.dsList)
+          && checkLists(rsList, other.rsList);
+    }
+
+    private boolean checkLists(List<?> list, List<?> otherList)
+    {
+      if (otherList.size() != list.size())
+        return false;
+      for (Object otherObj : otherList)
       {
         int found = 0;
-        for (DSInfo thisDsInfo : dsList)
+        for (Object thisObj : list)
         {
-          if (thisDsInfo.equals(dsInfo))
+          if (thisObj.equals(otherObj))
             found++;
         }
         // Not found
@@ -1096,52 +1079,33 @@
         assertFalse(found > 1);
       // Ok, found exactly once in the list, examine next structure
       }
-
-      // Check rsList
-      if (topoView.rsList.size() != rsList.size())
-        return false;
-      for (RSInfo rsInfo : topoView.rsList)
-      {
-        int found = 0;
-        for (RSInfo thisRsInfo : rsList)
-        {
-          if (thisRsInfo.equals(rsInfo))
-            found++;
-        }
-        // Not found
-        if (found == 0)
-          return false;
-        // Should never see twice as rsInfo structure in a dsList
-        assertFalse(found > 1);
-      // Ok, found exactly once in the list, examine next structure
-      }
-
       return true;
     }
 
     @Override
     public String toString()
     {
-      String dsStr = "";
+      final StringBuilder sb = new StringBuilder("TopoView:");
+      sb.append("\n----------------------------\n");
+      sb.append("CONNECTED DS SERVERS:\n");
       for (DSInfo dsInfo : dsList)
       {
-        dsStr += dsInfo.toString() + "\n----------------------------\n";
+        sb.append(dsInfo).append("\n----------------------------\n");
       }
-
-      String rsStr = "";
+      sb.append("CONNECTED RS SERVERS:\n");
       for (RSInfo rsInfo : rsList)
       {
-        rsStr += rsInfo.toString() + "\n----------------------------\n";
+        sb.append(rsInfo).append("\n----------------------------\n");
       }
+      return sb.toString();
+    }
 
-      return ("TopoView:" +
-        "\n----------------------------\n" + "CONNECTED DS SERVERS:\n" + dsStr +
-        "CONNECTED RS SERVERS:\n" + rsStr);
+    private TopologyViewTest getOuterType()
+    {
+      return TopologyViewTest.this;
     }
   }
 
-
-
   private String getHostPort(int port)
   {
     return LOCAL_HOST_NAME + ":" + port;
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 5c140f6..f778fb4 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -38,6 +38,7 @@
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
 import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
@@ -56,6 +57,7 @@
 import org.testng.annotations.Test;
 
 import static java.util.Arrays.*;
+
 import static org.assertj.core.api.Assertions.*;
 import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.loggers.ErrorLogger.*;
@@ -239,14 +241,12 @@
    * (no server state constructor version)
    */
   private FakeReplicationDomain createFakeReplicationDomain(int serverId,
-    int groupId, int rsId, long generationId, boolean assured,
-    AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
-    int scenario)
-        throws Exception
+      int groupId, int rsId, long generationId, AssuredMode assuredMode,
+      int safeDataLevel, long assuredTimeout, int scenario) throws Exception
   {
-    ReplicationDomainCfg config = newFakeCfg(serverId, getRsPort(rsId), groupId);
-    return createFakeReplicationDomain(config, groupId, rsId, generationId, assured,
-      assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true);
+    return createFakeReplicationDomain(serverId, groupId, rsId, generationId,
+        assuredMode, safeDataLevel, assuredTimeout, scenario,
+        new ServerState(), true);
   }
 
   private int getRsPort(int rsId)
@@ -284,17 +284,23 @@
    * @throws Exception
    */
   private FakeReplicationDomain createFakeReplicationDomain(
-      ReplicationDomainCfg config, int groupId, int rsId, long generationId,
-      boolean assured, AssuredMode assuredMode, int safeDataLevel,
+      int serverId, int groupId, int rsId, long generationId,
+      AssuredMode assuredMode, int safeDataLevel,
       long assuredTimeout, int scenario, ServerState serverState,
       boolean startListen) throws Exception
   {
-    // Set port to right real RS according to its id
-    int rsPort = getRsPort(rsId);
+    final DomainFakeCfg config = newDomainConfig(serverId, groupId, rsId,
+        assuredMode, safeDataLevel, assuredTimeout);
+    return createFakeReplicationDomain(config, rsId, generationId, scenario,
+        serverState, startListen);
+  }
 
-    FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
-        config.getBaseDN(), config.getServerId(), generationId, (byte) groupId,
-        assured, assuredMode, (byte) safeDataLevel, assuredTimeout, scenario, serverState);
+  private FakeReplicationDomain createFakeReplicationDomain(
+      ReplicationDomainCfg config, int rsId, long generationId, int scenario,
+      ServerState serverState, boolean startListen) throws Exception
+  {
+    FakeReplicationDomain fakeReplicationDomain =
+        new FakeReplicationDomain(config, generationId, scenario, serverState);
 
     fakeReplicationDomain.startPublishService(config);
     if (startListen)
@@ -304,20 +310,42 @@
     assertTrue(fakeReplicationDomain.isConnected());
     // Check connected server port
     HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
-    assertEquals(rd.getPort(), rsPort);
+    assertEquals(rd.getPort(), getRsPort(rsId));
 
     return fakeReplicationDomain;
   }
 
-  private DomainFakeCfg newFakeCfg(int serverId, int rsPort, int groupId) throws Exception
+  private DomainFakeCfg newDomainConfig(int serverId, int groupId, int rsId,
+      AssuredMode assuredMode, int safeDataLevel, long assuredTimeout)
+      throws DirectoryException
   {
-    DomainFakeCfg fakeCfg = new DomainFakeCfg(
-        DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort), groupId);
+    final int rsPort = getRsPort(rsId);
+    final DomainFakeCfg fakeCfg = new DomainFakeCfg(
+        DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort),
+        getAssuredType(assuredMode),
+        safeDataLevel, groupId, assuredTimeout, new TreeSet<String>());
     fakeCfg.setHeartbeatInterval(1000);
     fakeCfg.setChangetimeHeartbeatInterval(500);
     return fakeCfg;
   }
 
+  private AssuredType getAssuredType(AssuredMode assuredMode)
+  {
+    if (assuredMode == null)
+    {
+      return AssuredType.NOT_ASSURED;
+    }
+
+    switch (assuredMode)
+    {
+    case SAFE_READ_MODE:
+      return AssuredType.SAFE_READ;
+    case SAFE_DATA_MODE:
+      return AssuredType.SAFE_DATA;
+    }
+    throw new RuntimeException("Not implemented for " + assuredMode);
+  }
+
   /**
    * Creates and connects a new fake replication server, using the passed scenario.
    */
@@ -412,9 +440,8 @@
   {
     /** The scenario this DS is expecting */
     private int scenario = -1;
-    private long generationId = -1;
 
-    private CSNGenerator gen = null;
+    private CSNGenerator gen;
 
     /** False if a received update had assured parameters not as expected */
     private boolean everyUpdatesAreOk = true;
@@ -437,28 +464,14 @@
      * behavior upon reception of updates)
      * @throws org.opends.server.config.ConfigException
      */
-    public FakeReplicationDomain(
-      DN baseDN,
-      int serverID,
-      long generationId,
-      byte groupId,
-      boolean assured,
-      AssuredMode assuredMode,
-      byte safeDataLevel,
-      long assuredTimeout,
-      int scenario,
-      ServerState serverState) throws ConfigException
+    public FakeReplicationDomain(ReplicationDomainCfg config,
+        long generationId, int scenario, ServerState serverState)
+        throws ConfigException
     {
-      super(baseDN, serverID, serverState);
-      this.generationId = generationId;
-      setGroupId(groupId);
-      setAssured(assured);
-      setAssuredMode(assuredMode);
-      setAssuredSdLevel(safeDataLevel);
-      setAssuredTimeout(assuredTimeout);
+      super(config, generationId, serverState);
       this.scenario = scenario;
 
-      gen = new CSNGenerator(serverID, 0L);
+      gen = new CSNGenerator(config.getServerId(), 0L);
     }
 
     public boolean receivedUpdatesOk()
@@ -490,12 +503,6 @@
     }
 
     @Override
-    public long getGenerationID()
-    {
-      return generationId;
-    }
-
-    @Override
     protected void importBackend(InputStream input) throws DirectoryException
     {
       // Not needed for this test
@@ -549,8 +556,8 @@
         debugInfo("Fake DS " + getServerId() + " received update assured flag is wrong: " + updateMsg);
         ok = false;
       }
-      if (updateMsg.getAssuredMode() !=  getAssuredMode())
-      {
+      if (isAssured() && updateMsg.getAssuredMode() != getAssuredMode())
+      { // it is meaningless to have different assured mode when UpdateMsg is not assured
         debugInfo("Fake DS " + getServerId() + " received update assured mode is wrong: " + updateMsg);
         ok = false;
       }
@@ -645,7 +652,7 @@
      * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
      * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
      * @param groupId our group id
-     * @param baseDN the basedn we connect with, to the real RS
+     * @param baseDN the baseDN we connect with, to the real RS
      * @param generationId the generation id we use at connection to real RS
      */
     public FakeReplicationServer(int port, int serverId, boolean assured,
@@ -1022,9 +1029,7 @@
        */
 
       // Create real RS 1
-      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
-        testCase, 0);
-      assertNotNull(rs1);
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
 
       /*
        * Start main DS (the one which sends updates)
@@ -1033,8 +1038,7 @@
       // Create and connect fake domain 1 to RS 1
       // Assured mode: SD, level 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, mainDsGid, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       /*
        * Start one other fake DS
@@ -1049,8 +1053,7 @@
         // this would timeout. If main DS group id is not the same as the real RS one,
         // the update will even not come to real RS as assured
         fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
-          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
-          TIMEOUT_DS_SCENARIO);
+            DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
       }
 
       /*
@@ -1363,9 +1366,7 @@
        */
 
       // Create real RS 1
-      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
-        testCase, 0);
-      assertNotNull(rs1);
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
 
       /*
        * Start main DS (the one which sends updates)
@@ -1373,8 +1374,7 @@
 
       // Create and connect fake domain 1 to RS 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       /*
        * Start one other fake DS
@@ -1384,8 +1384,7 @@
       if (otherFakeDS)
       {
         fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
-          otherFakeDsGenId, false, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
-          TIMEOUT_DS_SCENARIO);
+            otherFakeDsGenId, null, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
       }
 
       /*
@@ -1873,9 +1872,7 @@
        */
 
       // Create real RS 1
-      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
-        testCase, 0);
-      assertNotNull(rs1);
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
 
       /*
        * Start fake RS to make the RS have the default generation id
@@ -1976,20 +1973,13 @@
        */
       int numberOfRealRSs = 3;
 
-      // Create real RS 1
+      // Create real RS 1, 2, 3
       rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs1);
-
-      // Create real RS 2
       rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs2);
-
-      // Create real RS 3
       rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs3);
 
       /*
        * Start DS that will send updates
@@ -1998,8 +1988,7 @@
       // Wait for RSs to connect together
       // Create and connect fake domain 1 to RS 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // Wait for RSs connections to be finished
       // DS must see expected numbers of RSs
@@ -2049,9 +2038,7 @@
        */
 
       // Create real RS 1
-      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
-        testCase, 0);
-      assertNotNull(rs1);
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
 
       /*******************
        * Start main DS 1 (the one which sends updates)
@@ -2060,8 +2047,7 @@
       // Create and connect DS 1 to RS 1
       // Assured mode: SR
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       /*
        * Send a first assured safe read update
@@ -2089,10 +2075,9 @@
       // Create and connect DS 2 to RS 1
       // Assured mode: SR
       ServerState serverState = fakeRd1.getServerState();
-      ReplicationDomainCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID);
-      fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-              REPLY_OK_DS_SCENARIO, serverState, true);
+      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+          REPLY_OK_DS_SCENARIO, serverState, true);
 
       // Wait for connections to be established
       waitForStableTopo(fakeRd1, 1, 1);
@@ -2306,31 +2291,26 @@
        */
 
       // Create real RS 1
-      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
-        testCase, 0);
-      assertNotNull(rs1);
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
 
       /*
        * Start main DS 1 (the one which sends updates)
        */
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       /*
        * Start another fake DS 2 connected to RS
        */
       fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       /*
        * Start another fake DS 3 connected to RS
        */
       fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
-        otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID),
-        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        otherFakeDsScen);
+          otherFakeDsGenId, otherFakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null,
+          1, LONG_TIMEOUT, otherFakeDsScen);
 
       /*
        * Start fake RS (RS 1) connected to RS
@@ -2617,25 +2597,17 @@
        */
       int numberOfRealRSs = 4;
 
-      // Create real RS 1
+      // Create real RS 1, 2, 3
       rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs1);
-
-      // Create real RS 2
       rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs2);
-
-      // Create real RS 3
       rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs3);
 
       // Create real RS 4 (different GID 2)
       rs4 = createReplicationServer(RS4_ID, OTHER_GID_BIS, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs4);
 
       /*
        * Start DS 1 that will send assured updates
@@ -2644,8 +2616,7 @@
       // Wait for RSs to connect together
       // Create and connect fake domain 1 to RS 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // Wait for connections to be finished
       // DS must see expected numbers of DSs/RSs
@@ -2674,23 +2645,19 @@
 
       // DS 2 connected to RS 1
       fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       // DS 3 connected to RS 2
       fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, DEFAULT_GID, RS2_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       // DS 4 connected to RS 3
       fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, DEFAULT_GID, RS3_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       // DS 5 connected to RS 3
       fakeRDs[5] = createFakeReplicationDomain(FDS5_ID, DEFAULT_GID, RS3_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       /*
        * Start DSs that will not receive updates from DS 1 as assured because
@@ -2699,23 +2666,19 @@
 
       // DS 6 connected to RS 1
       fakeRDs[6] = createFakeReplicationDomain(FDS6_ID, OTHER_GID, RS1_ID,
-        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 7 connected to RS 2
       fakeRDs[7] = createFakeReplicationDomain(FDS7_ID, OTHER_GID, RS2_ID,
-        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 8 connected to RS 3
       fakeRDs[8] = createFakeReplicationDomain(FDS8_ID, OTHER_GID, RS3_ID,
-        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 9 (GID 2) connected to RS 4
       fakeRDs[9] = createFakeReplicationDomain(FDS9_ID, OTHER_GID_BIS, RS4_ID,
-        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       /*
        * Start DSs that will not receive updates from DS 1 because
@@ -2724,18 +2687,15 @@
 
       // DS 10 connected to RS 1
       fakeRDs[10] = createFakeReplicationDomain(FDS10_ID, DEFAULT_GID, RS1_ID,
-        OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 11 connected to RS 2
       fakeRDs[11] = createFakeReplicationDomain(FDS11_ID, DEFAULT_GID, RS2_ID,
-        OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 12 connected to RS 3
       fakeRDs[12] = createFakeReplicationDomain(FDS12_ID, DEFAULT_GID, RS3_ID,
-        OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // Wait for connections to be finished
       // DS must see expected numbers of DSs/RSs
@@ -2897,15 +2857,11 @@
        */
       int numberOfRealRSs = 2;
 
-      // Create real RS 1
+      // Create real RS 1, 2
       rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs1);
-
-      // Create real RS 2
       rs2 = createReplicationServer(RS2_ID, OTHER_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs2);
 
       /*
        * Start DSs with GID=DEFAULT_GID, connected to RS1
@@ -2913,13 +2869,11 @@
 
       // DS 1 connected to RS 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 2 connected to RS 1
       fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       /*
        * Start DSs with GID=OTHER_GID, connected to RS2
@@ -2927,13 +2881,11 @@
 
       // DS 3 connected to RS 2
       fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, OTHER_GID, RS2_ID,
-        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       // DS 4 connected to RS 3
       fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, OTHER_GID, RS2_ID,
-        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO);
+          DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
 
       // Wait for connections to be finished
       // DS must see expected numbers of DSs/RSs
@@ -3010,15 +2962,11 @@
        */
       int numberOfRealRSs = 2;
 
-      // Create real RS 1
+      // Create real RS 1, 2
       rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT + 1000, // Be sure DS2 timeout is seen from DS1
         testCase, numberOfRealRSs);
-      assertNotNull(rs1);
-
-      // Create real RS 2
       rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase, numberOfRealRSs);
-      assertNotNull(rs2);
 
       /*
        * Start 2 fake DSs
@@ -3026,13 +2974,12 @@
 
       // DS 1 connected to RS 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 2 connected to RS 2
       fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID,
-        fakeDsGenId, (fakeDsGid == DEFAULT_GID),
-        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen);
+          fakeDsGenId, fakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null,
+          1, LONG_TIMEOUT, fakeDsScen);
 
       // Wait for connections to be finished
       // DS must see expected numbers of DSs/RSs
@@ -3167,15 +3114,14 @@
 
       // DS 1 connected to RS 1
       fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        TIMEOUT_DS_SCENARIO);
+          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
 
       // DS 2 connected to RS 1 with low window to easily put it in DEGRADED status
-      DomainFakeCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID);
+      final DomainFakeCfg config = newDomainConfig(FDS2_ID, DEFAULT_GID, RS1_ID,
+          AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT);
       config.setWindowSize(2);
-      fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
-        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
-        REPLY_OK_DS_SCENARIO, new ServerState(), false);
+      fakeRDs[2] = createFakeReplicationDomain(config, RS1_ID, DEFAULT_GENID,
+          REPLY_OK_DS_SCENARIO, new ServerState(), false);
 
       // Wait for connections to be finished
       // DS must see expected numbers of DSs/RSs
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 601a39b..2108816 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2013 ForgeRock AS
+ *      Portions Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -65,19 +65,24 @@
 
   private int exportedEntryCount;
 
-  private long generationID = 1;
-
   private FakeReplicationDomain(DN baseDN, int serverID,
       SortedSet<String> replicationServers, int window, long heartbeatInterval)
       throws ConfigException
   {
-    super(baseDN, serverID, 100);
+    super(newConfig(baseDN, serverID, replicationServers, window,
+        heartbeatInterval), 1);
+    startPublishService(getConfig());
+    startListenService();
+  }
+
+  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
+      SortedSet<String> replicationServers, int window, long heartbeatInterval)
+  {
     DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
     fakeCfg.setHeartbeatInterval(heartbeatInterval);
     fakeCfg.setChangetimeHeartbeatInterval(500);
     fakeCfg.setWindowSize(window);
-    startPublishService(fakeCfg);
-    startListenService();
+    return fakeCfg;
   }
 
   public FakeReplicationDomain(DN baseDN, int serverID,
@@ -122,12 +127,6 @@
   }
 
   @Override
-  public long getGenerationID()
-  {
-    return generationID;
-  }
-
-  @Override
   protected void importBackend(InputStream input) throws DirectoryException
   {
     byte[] buffer = new byte[1000];
@@ -157,8 +156,4 @@
     return true;
   }
 
-  public void setGenerationID(long newGenerationID)
-  {
-    generationID = newGenerationID;
-  }
 }
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index b0a49c0..d6b5ca1 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2013 ForgeRock AS
+ *      Portions Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -58,14 +58,20 @@
       SortedSet<String> replicationServers, long heartbeatInterval,
       BlockingQueue<UpdateMsg> queue) throws ConfigException
   {
-    super(baseDN, serverID, 100);
+    super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), 1);
+    startPublishService(getConfig());
+    startListenService();
+    this.queue = queue;
+  }
+
+  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
+      SortedSet<String> replicationServers, long heartbeatInterval)
+  {
     final DomainFakeCfg fakeCfg =
         new DomainFakeCfg(baseDN, serverID, replicationServers);
     fakeCfg.setHeartbeatInterval(heartbeatInterval);
     fakeCfg.setChangetimeHeartbeatInterval(500);
-    startPublishService(fakeCfg);
-    startListenService();
-    this.queue = queue;
+    return fakeCfg;
   }
 
   private static final int IMPORT_SIZE = 100000000;
@@ -98,12 +104,6 @@
   }
 
   @Override
-  public long getGenerationID()
-  {
-    return 1;
-  }
-
-  @Override
   protected void importBackend(InputStream input) throws DirectoryException
   {
     long startDate = System.currentTimeMillis();

--
Gitblit v1.10.0