mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
09.52.2014 cb1bb5d131addd27e2927ec90cc572a8c4d40f80
Front-port of r10098.

Replaced/removed numerous fields and methods of ReplicationDomain by directly storing the config.
Changed a lot of subclasses due to this change.


ReplicationDomain.java:
Pulled up generationId, config fields, getGenerationID(), readAssuredConfig() and needReconnection() methods from LDAPReplicationDomain.
Replaced baseDN, serverID, groupId, refUrls, initWindow fields with new config field + encapsulated fields, updated getters, removed setters
Replaced assured, assuredMode, assuredSdLevel, assuredTimeout fields with new assuredConfig field + encapsulated fields, updated getters, removed setters
Removed domains field, never used.
Inlined stopDomain().
Added generationId to the ctor + implemented getGenerationID() + added setGenerationID().
Extracted method restartService(), needsAck().

LDAPReplicationDomain.java:
Pulled up generationId, config fields, getGenerationID(), readAssuredConfig() and needReconnection() methods to ReplicationDomain.
In ctor, consequence of the change to ReplicationDomain.

DSInfo.java:
Made it immutable.

StartSessionMsg.java:
Code cleanup.
Javadocs.


DummyReplicationDomain.java, FractionalReplicationTest.java, FakeReplicationDomain.java, FakeStressReplicationDomain.java:
Consequence of the changes to ReplicationDomain.
Pulled up generationId, getGenerationID(), setGeneration() to ReplicationDomain.
Called ReplicationDomain.getConfig().

TopologyViewTest.java:
Consequence of the change to ReplicationDomain.getRefUrls().
Extracted method checkLists().
Code cleanup.

AssuredReplicationServerTest.java:
Streamlined and simplified the createFakeReplicationDomain() methods + removed the boolean assured parameter.
Added getAssuredType(AssuredMode).
In FakeReplicationDomain, consequence of the changes to ReplicationDomain.
In checkUpdateAssuredParameters(), changed the assert a bit to match removal of the boolean assured parameter.
Removed useless calls to assertNotNull() after calling createReplicationServer() (result is never null).
Replaced newFakeCfg() by newDomainConfig().
10 files modified
1147 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java 21 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 100 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java 52 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java 503 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java 14 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 26 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java 126 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 256 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 27 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 22 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,12 +22,11 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
import java.util.List;
import java.util.Set;
import java.util.*;
/**
 * This class holds information about a DS connected to the topology. This
@@ -35,8 +34,7 @@
 * messages, to keep every member (RS or DS) of the topology aware of the DS
 * topology.
 * <p>
 * This class is almost immutable, because it does not copy the List and Set
 * passed into the ctor.
 * @Immutable
 */
public final class DSInfo
{
@@ -68,7 +66,6 @@
  private final Set<String> eclIncludesForDeletes;
  /**
   * Creates a new instance of DSInfo with every given info.
   *
@@ -101,8 +98,8 @@
  public DSInfo(int dsId, String dsUrl, int rsId, long generationId,
      ServerStatus status, boolean assuredFlag,
      AssuredMode assuredMode, byte safeDataLevel, byte groupId,
      List<String> refUrls, Set<String> eclIncludes,
      Set<String> eclIncludesForDeletes, short protocolVersion)
      Collection<String> refUrls, Collection<String> eclIncludes,
      Collection<String> eclIncludesForDeletes, short protocolVersion)
  {
    this.dsId = dsId;
    this.dsUrl = dsUrl;
@@ -113,9 +110,11 @@
    this.assuredMode = assuredMode;
    this.safeDataLevel = safeDataLevel;
    this.groupId = groupId;
    this.refUrls = refUrls;
    this.eclIncludes = eclIncludes;
    this.eclIncludesForDeletes = eclIncludesForDeletes;
    this.refUrls = Collections.unmodifiableList(new ArrayList<String>(refUrls));
    this.eclIncludes =
        Collections.unmodifiableSet(new HashSet<String>(eclIncludes));
    this.eclIncludesForDeletes =
        Collections.unmodifiableSet(new HashSet<String>(eclIncludesForDeletes));
    this.protocolVersion = protocolVersion;
  }
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -77,7 +77,6 @@
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.plugin.EntryHistorical.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.replication.service.ReplicationMonitor.*;
@@ -186,7 +185,6 @@
  private final PersistentServerState state;
  private int numReplayedPostOpCalled = 0;
  private volatile long generationId = -1;
  private volatile boolean generationIdSavedStatus = false;
  private final CSNGenerator generator;
@@ -227,7 +225,6 @@
  private final SortedMap<CSN, FakeOperation> replayOperations =
    new TreeMap<CSN, FakeOperation>();
  private ReplicationDomainCfg config;
  private ExternalChangelogDomain eclDomain;
  /**
@@ -471,11 +468,8 @@
  public LDAPReplicationDomain(ReplicationDomainCfg configuration,
      BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
  {
    super(configuration.getBaseDN(),
          configuration.getServerId(),
          configuration.getInitializationWindowSize());
    super(configuration, -1);
    this.config = configuration;
    this.updateToReplayQueue = updateToReplayQueue;
    // Get assured configuration
@@ -484,12 +478,7 @@
    // Get fractional configuration
    fractionalConfig = new FractionalConfig(getBaseDN());
    readFractionalConfig(configuration, false);
    setGroupId((byte)configuration.getGroupId());
    setURLs(configuration.getReferralsUrl());
    storeECLConfiguration(configuration);
    solveConflictFlag = isSolveConflict(configuration);
    Backend backend = retrievesBackend(getBaseDN());
@@ -551,76 +540,6 @@
  }
  /**
   * Gets and stores the assured replication configuration parameters. Returns
   * a boolean indicating if the passed configuration has changed compared to
   * previous values and the changes require a reconnection.
   * @param configuration The configuration object
   * @param allowReconnection Tells if one must reconnect if significant changes
   *        occurred
   */
  private void readAssuredConfig(ReplicationDomainCfg configuration,
    boolean allowReconnection)
  {
    final boolean needReconnection = needReconnection(configuration);
    // Disconnect if required: changing configuration values before
    // disconnection would make assured replication used immediately and
    // disconnection could cause some timeouts error.
    if (needReconnection && allowReconnection)
      disableService();
    switch (configuration.getAssuredType())
    {
    case NOT_ASSURED:
      setAssured(false);
      break;
    case SAFE_DATA:
      setAssured(true);
      setAssuredMode(AssuredMode.SAFE_DATA_MODE);
      break;
    case SAFE_READ:
      setAssured(true);
      setAssuredMode(AssuredMode.SAFE_READ_MODE);
      break;
    }
    setAssuredSdLevel((byte) configuration.getAssuredSdLevel());
    setAssuredTimeout(configuration.getAssuredTimeout());
    // Reconnect if required
    if (needReconnection && allowReconnection)
      enableService();
  }
  private boolean needReconnection(ReplicationDomainCfg cfg)
  {
    switch (cfg.getAssuredType())
    {
    case NOT_ASSURED:
      if (isAssured())
      {
        return true;
      }
      break;
    case SAFE_DATA:
      if (!isAssured() || getAssuredMode() == SAFE_READ_MODE)
      {
        return true;
      }
      break;
    case SAFE_READ:
      if (!isAssured() || getAssuredMode() == SAFE_DATA_MODE)
      {
        return true;
      }
      break;
    }
    return isAssured()
        && getAssuredMode() == SAFE_DATA_MODE
        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
  }
  /**
   * Sets the error message id to be used when online import is stopped with
   * error by the fractional replication ldif import plugin.
   * @param importErrorMessageId The message to use.
@@ -686,7 +605,8 @@
    }
    // Disable service if configuration changed
    if (needReconnection && allowReconnection)
    final boolean needRestart = needReconnection && allowReconnection;
    if (needRestart)
    {
      disableService();
    }
@@ -713,7 +633,7 @@
    }
    // Reconnect if required
    if (needReconnection && allowReconnection)
    if (needRestart)
      enableService();
  }
@@ -1622,7 +1542,7 @@
    // FIXME should the next call use the initWindow parameter rather than the
    // instance variable?
    super.initializeRemote(target, requestorID, initTask, this.initWindow);
    super.initializeRemote(target, requestorID, initTask, getInitWindow());
  }
  /**
@@ -2377,7 +2297,7 @@
      DirectoryServer.deregisterAlertGenerator(this);
      // stop the ReplicationDomain
      stopDomain();
      disableService();
    }
    // wait for completion of the persistentServerState thread.
@@ -3412,14 +3332,6 @@
    return genId;
  }
  /** {@inheritDoc} */
  @Override
  public long getGenerationID()
  {
    return generationId;
  }
  /**
   * Run a modify operation to update the entry whose DN is given as
   * a parameter with the generationID information.
opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -22,17 +22,14 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
@@ -43,6 +40,7 @@
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
/**
 * This message is used by DS to confirm a RS he wants to connect to him (open
@@ -60,19 +58,18 @@
 */
public class StartSessionMsg extends ReplicationMsg
{
  // The list of referrals URLs to the sending DS
  private List<String> referralsURLs = new ArrayList<String>();
  // The initial status the DS starts with
  /** The list of referrals URLs to the sending DS. */
  private final List<String> referralsURLs = new ArrayList<String>();
  /** The initial status the DS starts with. */
  private ServerStatus status = ServerStatus.INVALID_STATUS;
  // Assured replication enabled on DS or not
  private boolean assuredFlag = false;
  // DS assured mode (relevant if assured replication enabled)
  /** Assured replication enabled on DS or not. */
  private boolean assuredFlag;
  /** DS assured mode (relevant if assured replication enabled). */
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // DS safe data level (relevant if assured mode is safe data)
  private byte safeDataLevel = (byte) 1;
  /** DS safe data level (relevant if assured mode is safe data). */
  private byte safeDataLevel = 1;
  private Set<String> eclIncludes = new HashSet<String>();
  private Set<String> eclIncludesForDeletes = new HashSet<String>();
  /**
@@ -103,10 +100,10 @@
   * @param assuredMode Assured type
   * @param safeDataLevel Assured mode safe data level
   */
  public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs,
    boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel)
  {
    this.referralsURLs = referralsURLs;
    this.referralsURLs.addAll(referralsURLs);
    this.status = status;
    this.assuredFlag = assuredFlag;
    this.assuredMode = assuredMode;
@@ -119,9 +116,9 @@
   * @param status Status we are starting with
   * @param referralsURLs Referrals URLs to be used by peer DSs
   */
  public StartSessionMsg(ServerStatus status, List<String> referralsURLs)
  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs)
  {
    this.referralsURLs = referralsURLs;
    this.referralsURLs.addAll(referralsURLs);
    this.status = status;
    this.assuredFlag = false;
  }
@@ -130,9 +127,7 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
@@ -334,7 +329,6 @@
      /* Read the referrals URLs */
      int pos = 5;
      referralsURLs = new ArrayList<String>();
      while (pos < in.length)
      {
        /*
@@ -373,24 +367,18 @@
    return status;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    String urls = "";
    for (String s : referralsURLs)
    {
      urls += s + " | ";
    }
    return ("StartSessionMsg content:\nstatus: " + status +
    String urls = StaticUtils.collectionToString(referralsURLs, " | ");
    return "StartSessionMsg content:\nstatus: " + status +
      "\nassuredFlag: " + assuredFlag +
      "\nassuredMode: " + assuredMode +
      "\nsafeDataLevel: " + safeDataLevel +
      "\nreferralsURLs: " + urls +
      "\nEclIncludes " + eclIncludes +
      "\nEclIncludeForDeletes: " + eclIncludesForDeletes);
      "\nEclIncludeForDeletes: " + eclIncludesForDeletes;
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
@@ -56,6 +57,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.common.StatusMachine.*;
/**
@@ -100,7 +102,7 @@
 *   implementation using methods {@link #initializeRemote(int)}
 *   or {@link #initializeFromRemote(int)}.
 * <p>
 *   At shutdown time, the {@link #stopDomain()} method should be called to
 *   At shutdown time, the {@link #disableService()} method should be called to
 *   cleanly stop the replication service.
 */
public abstract class ReplicationDomain
@@ -115,25 +117,21 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /** The configuration of the replication domain. */
  protected volatile ReplicationDomainCfg config;
  /**
   *  The baseDN for the Replication Service.
   *  All Replication Domain using this baseDN will be connected
   *  through the Replication Service.
   * The assured configuration of the replication domain. It is a duplicate of
   * {@link #config} because of its update model.
   *
   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
   */
  private final DN baseDN;
  /**
   * The identifier of this Replication Domain inside the
   * Replication Service.
   * Each Domain must use a unique ServerID.
   */
  private final int serverID;
  private volatile ReplicationDomainCfg assuredConfig;
  /**
   * The ReplicationBroker that is used by this ReplicationDomain to
   * connect to the ReplicationService.
   */
  protected ReplicationBroker broker = null;
  protected ReplicationBroker broker;
  /**
   * This Map is used to store all outgoing assured messages in order
@@ -158,33 +156,6 @@
  private volatile DirectoryThread listenerThread = null;
  /**
   * A Map used to store all the ReplicationDomains created on this server.
   */
  private static Map<DN, ReplicationDomain> domains =
      new HashMap<DN, ReplicationDomain>();
  /*
   * Assured mode properties
   */
  /** Whether assured mode is enabled for this domain. */
  private boolean assured = false;
  /** Assured sub mode (used when assured is true). */
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  /** Safe Data level (used when assuredMode is SAFE_DATA). */
  private byte assuredSdLevel = 1;
  /** The timeout in ms that should be used, when waiting for assured acks. */
  private long assuredTimeout = 2000;
  /** Group id. */
  private byte groupId = 1;
  /**
   * Referrals urls to be published to other servers of the topology.
   * <p>
   * TODO: fill that with all currently opened urls if no urls configured
   */
  private final List<String> refUrls = new ArrayList<String>();
  /**
   * A set of counters used for Monitoring.
   */
  private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
@@ -265,15 +236,6 @@
  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  /**
   * Window size used during initialization .. between
   * - the initializer/exporter DS that listens/waits acknowledges and that
   *   slows down data msg publishing based on the slowest server
   * - and each initialized/importer DS that publishes acknowledges each
   *   WINDOW/2 data msg received.
   */
  protected final int initWindow;
  /* Status related monitoring fields */
  /**
@@ -311,6 +273,12 @@
  private final Object sessionLock = new Object();
  /**
   * The generationId for this replication domain. It is made of a hash of the
   * 1000 first entries for this domain.
   */
  protected volatile long generationId;
  /**
   * Returns the {@link CSNGenerator} that will be used to
   * generate {@link CSN} for this domain.
   *
@@ -325,46 +293,39 @@
  /**
   * Creates a ReplicationDomain with the provided parameters.
   *
   * @param baseDN     The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param initWindow Window used during initialization.
   * @param config
   *          The configuration object for this ReplicationDomain
   * @param generationId
   *          the generation of this ReplicationDomain
   */
  public ReplicationDomain(DN baseDN, int serverID, int initWindow)
  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
  {
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.initWindow = initWindow;
    this.config = config;
    this.assuredConfig = config;
    this.generationId = generationId;
    this.state = new ServerState();
    this.generator = new CSNGenerator(serverID, state);
    domains.put(baseDN, this);
    this.generator = new CSNGenerator(getServerId(), state);
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   * (for unit test purpose only)
   * Creates a ReplicationDomain with the provided parameters. (for unit test
   * purpose only)
   *
   * @param baseDN     The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param serverState The serverState to use
   * @param config
   *          The configuration object for this ReplicationDomain
   * @param generationId
   *          the generation of this ReplicationDomain
   * @param serverState
   *          The serverState to use
   */
  public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
      ServerState serverState)
  {
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.initWindow = 100;
    this.config = config;
    this.assuredConfig = config;
    this.generationId = generationId;
    this.state = serverState;
    this.generator = new CSNGenerator(serverID, state);
    domains.put(baseDN, this);
    this.generator = new CSNGenerator(getServerId(), state);
  }
  /**
@@ -387,7 +348,7 @@
    if (!isValidInitialStatus(initStatus))
    {
      logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
          getBaseDNString(), Integer.toString(serverID)));
          getBaseDNString(), Integer.toString(getServerId())));
    }
    else
    {
@@ -406,7 +367,7 @@
  private void receiveChangeStatus(ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDN +
      TRACER.debugInfo("Replication domain " + getBaseDN() +
        " received change status message:\n" + csMsg);
    ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -416,7 +377,7 @@
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
          getBaseDNString(), Integer.toString(serverID)));
          getBaseDNString(), Integer.toString(getServerId())));
      return;
    }
@@ -468,13 +429,24 @@
  }
  /**
   * Returns the base DN of this ReplicationDomain.
   * Returns the current config of this ReplicationDomain.
   *
   * @return the config
   */
  protected ReplicationDomainCfg getConfig()
  {
    return config;
  }
  /**
   * Returns the base DN of this ReplicationDomain. All Replication Domain using
   * this baseDN will be connected through the Replication Service.
   *
   * @return The base DN of this ReplicationDomain
   */
  public DN getBaseDN()
  {
    return baseDN;
    return config.getBaseDN();
  }
  /**
@@ -484,16 +456,32 @@
   */
  public String getBaseDNString()
  {
    return baseDN.toNormalizedString();
    return getBaseDN().toNormalizedString();
  }
  /**
   * Get the server ID.
   * Get the server ID. The identifier of this Replication Domain inside the
   * Replication Service. Each Domain must use a unique ServerID.
   *
   * @return The server ID.
   */
  public int getServerId()
  {
    return serverID;
    return config.getServerId();
  }
  /**
   * Window size used during initialization .. between - the
   * initializer/exporter DS that listens/waits acknowledges and that slows down
   * data msg publishing based on the slowest server - and each
   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
   * received.
   *
   * @return the initWindow
   */
  protected int getInitWindow()
  {
    return config.getInitializationWindowSize();
  }
  /**
@@ -502,25 +490,38 @@
   */
  public boolean isAssured()
  {
    return assured;
    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
  }
  /**
   * Gives the mode for the assured replication of the domain.
   * Gives the mode for the assured replication of the domain. Only used when
   * assured is true).
   *
   * @return The mode for the assured replication of the domain.
   */
  public AssuredMode getAssuredMode()
  {
    return assuredMode;
    switch (assuredConfig.getAssuredType())
    {
    case SAFE_DATA:
    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
      return AssuredMode.SAFE_DATA_MODE;
    case SAFE_READ:
      return AssuredMode.SAFE_READ_MODE;
    }
    return null; // should never happen
  }
  /**
   * Gives the assured level of the replication of the domain.
   * Gives the assured Safe Data level of the replication of the domain. (used
   * when assuredMode is SAFE_DATA).
   *
   * @return The assured level of the replication of the domain.
   */
  public byte getAssuredSdLevel()
  {
    return assuredSdLevel;
    return (byte) assuredConfig.getAssuredSdLevel();
  }
  /**
@@ -529,7 +530,7 @@
   */
  public long getAssuredTimeout()
  {
    return assuredTimeout;
    return assuredConfig.getAssuredTimeout();
  }
  /**
@@ -538,16 +539,20 @@
   */
  public byte getGroupId()
  {
    return groupId;
    return (byte) config.getGroupId();
  }
  /**
   * Gets the referrals URLs this domain publishes.
   * Gets the referrals URLs this domain publishes. Referrals urls to be
   * published to other servers of the topology.
   * <p>
   * TODO: fill that with all currently opened urls if no urls configured
   *
   * @return The referrals URLs this domain publishes.
   */
  public List<String> getRefUrls()
  public Set<String> getRefUrls()
  {
    return refUrls;
    return config.getReferralsUrl();
  }
  /**
@@ -673,67 +678,6 @@
  }
  /**
   * Set the list of Referrals that should be returned when an
   * operation needs to be redirected to this server.
   *
   * @param referralsUrl The list of referrals.
   */
  public void setURLs(Set<String> referralsUrl)
  {
      this.refUrls.addAll(referralsUrl);
  }
  /**
   * Set the timeout of the assured replication.
   *
   * @param assuredTimeout the timeout of the assured replication.
   */
  public void setAssuredTimeout(long assuredTimeout)
  {
    this.assuredTimeout = assuredTimeout;
  }
  /**
   * Sets the groupID.
   *
   * @param groupId The groupID.
   */
  public void setGroupId(byte groupId)
  {
    this.groupId = groupId;
  }
  /**
   * Sets the level of assured replication.
   *
   * @param assuredSdLevel The level of assured replication.
   */
  public void setAssuredSdLevel(byte assuredSdLevel)
  {
    this.assuredSdLevel = assuredSdLevel;
  }
  /**
   * Sets the assured replication mode.
   *
   * @param dataMode The assured replication mode.
   */
  public void setAssuredMode(AssuredMode dataMode)
  {
    this.assuredMode = dataMode;
  }
  /**
   * Sets assured replication.
   *
   * @param assured A boolean indicating if assured replication should be used.
   */
  public void setAssured(boolean assured)
  {
    this.assured = assured;
  }
  /**
   * Receives an update message from the replicationServer.
   * The other types of messages are processed in an opaque way for the caller.
   * Also responsible for updating the list of pending changes
@@ -802,8 +746,8 @@
            */
            if (debugEnabled())
              TRACER.debugInfo(
                  "[IE] processErrorMsg:" + this.serverID +
                  " baseDN: " + this.baseDN +
                  "[IE] processErrorMsg:" + getServerId() +
                  " baseDN: " + getBaseDN() +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -873,10 +817,9 @@
    }
    numRcvdUpdates.incrementAndGet();
     byte rsGroupId = broker.getRsGroupId();
    if (update.isAssured()
        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
        && rsGroupId == groupId)
        && broker.getRsGroupId() == getGroupId()
        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
    {
      assuredSrReceivedUpdates.incrementAndGet();
    }
@@ -949,7 +892,7 @@
        requested servers. Log problem
        */
        logError(NOTE_DS_RECEIVED_ACK_ERROR.get(
            getBaseDNString(), Integer.toString(serverID),
            getBaseDNString(), Integer.toString(getServerId()),
            update.toString(), ack.errorsToString()));
        List<Integer> failedServers = ack.getFailedServers();
@@ -1048,7 +991,7 @@
     */
    public ExportThread(int serverIdToInitialize, int initWindow)
    {
      super("Export thread from serverId=" + serverID + " to serverId="
      super("Export thread from serverId=" + getServerId() + " to serverId="
          + serverIdToInitialize);
      this.serverIdToInitialize = serverIdToInitialize;
      this.initWindow = initWindow;
@@ -1379,7 +1322,7 @@
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, this.serverID, initTask, this.initWindow);
    initializeRemote(target, getServerId(), initTask, getInitWindow());
  }
  /**
@@ -1418,7 +1361,7 @@
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
          countEntries(), getBaseDNString(), serverID));
          countEntries(), getBaseDNString(), getServerId()));
      for (DSInfo dsi : getReplicasList())
      {
@@ -1436,8 +1379,8 @@
    }
    else
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
          countEntries(), getBaseDNString(), serverID, serverToInitialize));
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
          getBaseDNString(), getServerId(), serverToInitialize));
      ieContext.startList.add(serverToInitialize);
@@ -1471,7 +1414,7 @@
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            getBaseDN(), serverID, serverToInitialize,
            getBaseDN(), getServerId(), serverToInitialize,
            serverRunningTheTask, ieContext.entryCount, initWindow);
        broker.publish(initTargetMsg);
@@ -1492,8 +1435,8 @@
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
        // Notify the peer of the success
        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
        broker.publish(doneMsg);
        broker.publish(
            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
      }
      catch(DirectoryException exportException)
      {
@@ -1595,12 +1538,12 @@
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
          getBaseDNString(), serverID, cause));
          getBaseDNString(), getServerId(), cause));
    }
    else
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
          getBaseDNString(), serverID, serverToInitialize, cause));
          getBaseDNString(), getServerId(), serverToInitialize, cause));
    }
@@ -1894,10 +1837,8 @@
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            {
              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  this.serverID,
                  entryMsg.getSenderID(),
                  ieContext.msgCnt);
              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
              {
@@ -1945,7 +1886,7 @@
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      getBaseDNString(),
                      Integer.toString(this.serverID),
                      Integer.toString(getServerId()),
                      Integer.toString(ieContext.importSource)));
            ieContext.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, errMsg));
@@ -2017,7 +1958,7 @@
    // build the message
    EntryMsg entryMessage = new EntryMsg(
        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
    // Waiting the slowest loop
@@ -2219,7 +2160,7 @@
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
          getBaseDN(), serverID, source, this.initWindow);
          getBaseDN(), getServerId(), source, getInitWindow());
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
@@ -2281,14 +2222,14 @@
    try
    {
      // Log starting
      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
          getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID));
      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(),
          initTargetMsgReceived.getSenderID(), getServerId()));
      // Go into full update status
      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
      // Acquire an import context if no already done (and initialize).
      if (initTargetMsgReceived.getInitiatorID() != this.serverID)
      if (initTargetMsgReceived.getInitiatorID() != getServerId())
      {
        /*
        The initTargetMsgReceived is for an import initiated by the remote
@@ -2418,7 +2359,8 @@
      finally
      {
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
            getBaseDNString(), initTargetMsgReceived.getSenderID(),
            getServerId(),
            (ieContext.getException() == null ? ""
                : ieContext.getException().getLocalizedMessage()));
        logError(msg);
@@ -2458,7 +2400,7 @@
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
          Integer.toString(serverID), status.toString(), event.toString()));
          String.valueOf(getServerId()), status.toString(), event.toString()));
      return;
    }
@@ -2472,13 +2414,11 @@
        resetMonitoringCounters();
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
      {
        TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
            + status);
        TRACER.debugInfo("Replication domain " + getBaseDN()
            + " new status is: " + status);
      }
      // Perform whatever actions are needed to apply properties for being
@@ -2560,10 +2500,8 @@
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(-1);
    // Reconnect to the Replication Server so that it adopt our
    // GenerationID.
    disableService();
    enableService();
    // Reconnect to the Replication Server so that it adopts our GenerationID.
    restartService();
    // wait for the domain to reconnect.
    int count = 0;
@@ -2597,8 +2535,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
          + " resetGenerationId " + generationIdNewValue);
      TRACER.debugInfo("Server id " + getServerId() + " and domain "
          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
    }
    ResetGenerationIdMsg genIdMessage =
@@ -2607,7 +2545,7 @@
    if (!isConnected())
    {
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
          Integer.toString(serverID),
          Integer.toString(getServerId()),
          Long.toString(genIdMessage.getGenerationId()));
      throw new DirectoryException(ResultCode.OTHER, message);
    }
@@ -3110,33 +3048,19 @@
  }
  /**
   * Definitively stops the Replication Service.
   */
  public void stopDomain()
  {
    disableService();
    domains.remove(baseDN);
  }
  /**
   * Change some ReplicationDomain parameters.
   *
   * @param config
   *          The new configuration that this domain should now use.
   */
  public void changeConfig(ReplicationDomainCfg config)
  protected void changeConfig(ReplicationDomainCfg config)
  {
    this.groupId = (byte) config.getGroupId();
    if (broker != null && broker.changeConfig(config))
    {
      disableService();
      enableService();
      restartService();
    }
  }
  /**
   * Applies a configuration change to the attributes which should be be
   * included in the ECL.
@@ -3149,15 +3073,19 @@
  public void changeConfig(Set<String> includeAttributes,
      Set<String> includeAttributesForDeletes)
  {
    if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
        && broker != null)
    final boolean attrsModified = setEclIncludes(
        getServerId(), includeAttributes, includeAttributesForDeletes);
    if (attrsModified && broker != null)
    {
      disableService();
      enableService();
      restartService();
    }
  }
  private void restartService()
  {
    disableService();
    enableService();
  }
  /**
   * This method should trigger an export of the replicated data.
@@ -3236,15 +3164,13 @@
    Send an ack if it was requested and the group id is the same of the RS
    one. Only Safe Read mode makes sense in DS for returning an ack.
    */
    byte rsGroupId = broker.getRsGroupId();
    // Assured feature is supported starting from replication protocol V2
    if (msg.isAssured()
      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
    {
      AssuredMode msgAssuredMode = msg.getAssuredMode();
      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
      {
        if (rsGroupId == groupId)
        if (broker.getRsGroupId() == getGroupId())
        {
          // Send the ack
          AckMsg ackMsg = new AckMsg(msg.getCSN());
@@ -3255,7 +3181,7 @@
            ackMsg.setHasReplayError(true);
            //   -> replay error occurred in our server
            List<Integer> idList = new ArrayList<Integer>();
            idList.add(serverID);
            idList.add(getServerId());
            ackMsg.setFailedServers(idList);
          }
          broker.publish(ackMsg);
@@ -3269,10 +3195,11 @@
          }
        }
      }
      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
      {
        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
            msgAssuredMode.toString(), getBaseDNString(), msg.toString()));
        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()),
            msg.getAssuredMode().toString(), getBaseDNString(),
            msg.toString()));
      }
        // Nothing to do in Assured safe data mode, only RS ack updates.
    }
@@ -3303,23 +3230,22 @@
   */
  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
  {
    byte rsGroupId = broker.getRsGroupId();
    /*
     * If assured configured, set message accordingly to request an ack in the
     * right assured mode.
     * No ack requested for a RS with a different group id. Assured
     * replication supported for the same locality, i.e: a topology working in
     * the same
     * geographical location). If we are connected to a RS which is not in our
     * locality, no need to ask for an ack.
     * No ack requested for a RS with a different group id.
     * Assured replication supported for the same locality,
     * i.e: a topology working in the same geographical location).
     * If we are connected to a RS which is not in our locality,
     * no need to ask for an ack.
     */
    if (assured && rsGroupId == groupId)
    if (needsAck())
    {
      msg.setAssured(true);
      msg.setAssuredMode(assuredMode);
      if (assuredMode == AssuredMode.SAFE_DATA_MODE)
      msg.setAssuredMode(getAssuredMode());
      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
      {
        msg.setSafeDataLevel(assuredSdLevel);
        msg.setSafeDataLevel(getAssuredSdLevel());
      }
      // Add the assured message to the list of update that are waiting for acks
@@ -3327,6 +3253,11 @@
    }
  }
  private boolean needsAck()
  {
    return isAssured() && broker.getRsGroupId() == getGroupId();
  }
  /**
   * Wait for the processing of an assured message after it has been sent, if
   * assured replication is configured, otherwise, do nothing.
@@ -3340,14 +3271,10 @@
  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
    throws TimeoutException
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured mode configured, wait for acknowledgment for the just sent
    // message
    if (assured && rsGroupId == groupId)
    if (needsAck())
    {
      // Increment assured replication monitoring counters
      switch (assuredMode)
      switch (getAssuredMode())
      {
        case SAFE_READ_MODE:
          assuredSrSentUpdates.incrementAndGet();
@@ -3383,12 +3310,12 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "baseDN: " + baseDN);
              "baseDN: " + getBaseDN());
          }
          break;
        }
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout())
        {
          /*
          Timeout occurred, be sure that ack is not being received and if so,
@@ -3424,8 +3351,8 @@
          }
          throw new TimeoutException("No ack received for message csn: " + csn
              + " and replication domain: " + baseDN + " after "
              + assuredTimeout + " ms.");
              + " and replication domain: " + getBaseDN() + " after "
              + getAssuredTimeout() + " ms.");
        }
      }
    }
@@ -3481,7 +3408,7 @@
    {
      // This exception may only be raised if assured replication is enabled
      logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
          Long.toString(assuredTimeout), update.toString()));
          Long.toString(getAssuredTimeout()), update.toString()));
    }
  }
@@ -3493,11 +3420,25 @@
   *
   * @return The GenerationID.
   */
  public abstract long getGenerationID();
  public long getGenerationID()
  {
    return generationId;
  }
  /**
   * Subclasses should use this method to add additional monitoring
   * information in the ReplicationDomain.
   * Sets the generationId for this replication domain.
   *
   * @param generationId
   *          the generationId to set
   */
  public void setGenerationID(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Subclasses should use this method to add additional monitoring information
   * in the ReplicationDomain.
   *
   * @return Additional monitoring attributes that will be added in the
   *         ReplicationDomain monitoring entry.
@@ -3711,13 +3652,69 @@
   */
  public CSN getLastLocalChange()
  {
    return state.getCSN(serverID);
    return state.getCSN(getServerId());
  }
  /**
   * Gets and stores the assured replication configuration parameters. Returns a
   * boolean indicating if the passed configuration has changed compared to
   * previous values and the changes require a reconnection.
   *
   * @param config
   *          The configuration object
   * @param allowReconnection
   *          Tells if one must reconnect if significant changes occurred
   */
  protected void readAssuredConfig(ReplicationDomainCfg config,
      boolean allowReconnection)
  {
    // Disconnect if required: changing configuration values before
    // disconnection would make assured replication used immediately and
    // disconnection could cause some timeouts error.
    if (needReconnection(config) && allowReconnection)
    {
      disableService();
      assuredConfig = config;
      enableService();
    }
  }
  private boolean needReconnection(ReplicationDomainCfg cfg)
  {
    final AssuredMode assuredMode = getAssuredMode();
    switch (cfg.getAssuredType())
    {
    case NOT_ASSURED:
      if (isAssured())
      {
        return true;
      }
      break;
    case SAFE_DATA:
      if (!isAssured() || assuredMode == SAFE_READ_MODE)
      {
        return true;
      }
      break;
    case SAFE_READ:
      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
      {
        return true;
      }
      break;
    }
    return isAssured()
        && assuredMode == SAFE_DATA_MODE
        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -20,13 +20,14 @@
 *
 * CDDL HEADER END
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
import java.util.TreeSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
@@ -39,12 +40,9 @@
public class DummyReplicationDomain extends ReplicationDomain
{
  private final long generationId;
  public DummyReplicationDomain(long generationId)
  {
    super(null, -1, 0);
    this.generationId = generationId;
    super(new DomainFakeCfg(null, -1, new TreeSet<String>()), generationId);
  }
  @Override
@@ -92,10 +90,4 @@
    return false;
  }
  @Override
  public long getGenerationID()
  {
    return this.generationId;
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -538,6 +538,16 @@
    replicationServer = new ReplicationServer(conf);
  }
  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
      SortedSet<String> replicationServers, long heartbeatInterval)
  {
    DomainFakeCfg fakeCfg =
        new DomainFakeCfg(baseDN, serverID, replicationServers);
    fakeCfg.setHeartbeatInterval(heartbeatInterval);
    fakeCfg.setChangetimeHeartbeatInterval(500);
    return fakeCfg;
  }
  /**
   * This class is the minimum implementation of a Concrete ReplicationDomain
   * used to be able to connect to the RS with a known genid. Also to be able
@@ -561,18 +571,14 @@
     */
    private StringBuilder importString;
    private int exportedEntryCount;
    private long generationID = -1;
    public FakeReplicationDomain(DN baseDN, int serverID,
        SortedSet<String> replicationServers, long heartbeatInterval,
        long generationId) throws ConfigException
    {
      super(baseDN, serverID, 100);
      generationID = generationId;
      DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
      fakeCfg.setHeartbeatInterval(heartbeatInterval);
      fakeCfg.setChangetimeHeartbeatInterval(500);
      startPublishService(fakeCfg);
      super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval),
          generationId);
      startPublishService(getConfig());
      startListenService();
    }
@@ -604,12 +610,6 @@
    }
    @Override
    public long getGenerationID()
    {
      return generationID;
    }
    @Override
    protected void importBackend(InputStream input) throws DirectoryException
    {
      byte[] buffer = new byte[1000];
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -61,7 +61,7 @@
 */
public class TopologyViewTest extends ReplicationTestCase
{
  // Server id definitions
  /** Server id definitions */
  private static final int DS1_ID = 1;
  private static final int DS2_ID = 2;
  private static final int DS3_ID = 3;
@@ -72,7 +72,7 @@
  private static final int RS2_ID = 52;
  private static final int RS3_ID = 53;
  // Group id definitions
  /** Group id definitions */
  private static final int DS1_GID = 1;
  private static final int DS2_GID = 1;
  private static final int DS3_GID = 2;
@@ -83,7 +83,7 @@
  private static final int RS2_GID = 2;
  private static final int RS3_GID = 3;
  // Assured conf definitions
  /** Assured conf definitions */
  private static final AssuredType DS1_AT = AssuredType.NOT_ASSURED;
  private static final int DS1_SDL = -1;
  private static SortedSet<String> DS1_RU = new TreeSet<String>();
@@ -140,7 +140,7 @@
  private ReplicationServer rs2 = null;
  private ReplicationServer rs3 = null;
  // The tracer object for the debug logger
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  private void debugInfo(String s)
@@ -460,7 +460,7 @@
    return replicationDomain;
  }
  // Definitions of steps for the test case
  /** Definitions of steps for the test case */
  private static final int STEP_1 = 1;
  private static final int STEP_2 = 2;
  private static final int STEP_3 = 3;
@@ -824,17 +824,13 @@
      }
    // Perform necessary conversions
    boolean assuredFlag = (assuredType != AssuredType.NOT_ASSURED);
    AssuredMode assMode = ( (assuredType == AssuredType.SAFE_READ) ?
      AssuredMode.SAFE_READ_MODE : AssuredMode.SAFE_DATA_MODE);
    List<String> urls = new ArrayList<String>();
    for(String str : refUrls)
    {
      urls.add(str);
    }
    boolean assuredFlag = assuredType != AssuredType.NOT_ASSURED;
    AssuredMode assMode = assuredType == AssuredType.SAFE_READ
        ? AssuredMode.SAFE_READ_MODE
        : AssuredMode.SAFE_DATA_MODE;
    return new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
       (byte)assuredSdLevel, groupId, urls, eclIncludes, eclIncludes, protocolVersion);
       (byte)assuredSdLevel, groupId, refUrls, eclIncludes, eclIncludes, protocolVersion);
  }
  /**
@@ -1018,36 +1014,20 @@
     /**
      * Get the topo view of the current analyzed DS
      */
     List<DSInfo> internalDsList = rd.getReplicasList();
     // Add info for DS itself:
     // we need to clone the list as we don't want to modify the list kept
     // inside the DS.
     List<DSInfo> dsList = new ArrayList<DSInfo>();
     for (DSInfo aDsInfo : internalDsList)
     {
       dsList.add(aDsInfo);
     }
     int dsId = rd.getServerId();
     int rsId = rd.getRsServerId();
     ServerStatus status = rd.getStatus();
     boolean assuredFlag = rd.isAssured();
     AssuredMode assuredMode = rd.getAssuredMode();
     byte safeDataLevel = rd.getAssuredSdLevel();
     byte groupId = rd.getGroupId();
     List<String> refUrls = rd.getRefUrls();
     Set<String> eclInclude = rd.getEclIncludes();
     Set<String> eclIncludeForDeletes = rd.getEclIncludesForDeletes();
     short protocolVersion = ProtocolVersion.getCurrentVersion();
     DSInfo dsInfo = new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assuredMode,
       safeDataLevel, groupId, refUrls, eclInclude, eclIncludeForDeletes, protocolVersion);
     dsList.add(dsInfo);
      final DSInfo dsInfo = new DSInfo(rd.getServerId(), "dummy:1234", rd.getRsServerId(),
          TEST_DN_WITH_ROOT_ENTRY_GENID,
          rd.getStatus(),
          rd.isAssured(), rd.getAssuredMode(), rd.getAssuredSdLevel(),
          rd.getGroupId(), rd.getRefUrls(),
          rd.getEclIncludes(), rd.getEclIncludesForDeletes(),
          ProtocolVersion.getCurrentVersion());
      final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList());
      dsList.add(dsInfo);
     TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
     /**
      * Compare to what is the expected view
      */
     assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId);
   }
  }
@@ -1058,8 +1038,8 @@
   */
  private class TopoView
  {
    private List<DSInfo> dsList = null;
    private List<RSInfo> rsList = null;
    private List<DSInfo> dsList;
    private List<RSInfo> rsList;
    public TopoView(List<DSInfo> dsList, List<RSInfo> rsList)
    {
@@ -1073,20 +1053,23 @@
    @Override
    public boolean equals(Object obj)
    {
      assertNotNull(obj);
      assertFalse(obj.getClass() != this.getClass());
      TopoView topoView = (TopoView) obj;
      // Check dsList
      if (topoView.dsList.size() != dsList.size())
      if (obj == null || getClass() != obj.getClass())
        return false;
      for (DSInfo dsInfo : topoView.dsList)
      TopoView other = (TopoView) obj;
      return checkLists(dsList, other.dsList)
          && checkLists(rsList, other.rsList);
    }
    private boolean checkLists(List<?> list, List<?> otherList)
    {
      if (otherList.size() != list.size())
        return false;
      for (Object otherObj : otherList)
      {
        int found = 0;
        for (DSInfo thisDsInfo : dsList)
        for (Object thisObj : list)
        {
          if (thisDsInfo.equals(dsInfo))
          if (thisObj.equals(otherObj))
            found++;
        }
        // Not found
@@ -1096,52 +1079,33 @@
        assertFalse(found > 1);
      // Ok, found exactly once in the list, examine next structure
      }
      // Check rsList
      if (topoView.rsList.size() != rsList.size())
        return false;
      for (RSInfo rsInfo : topoView.rsList)
      {
        int found = 0;
        for (RSInfo thisRsInfo : rsList)
        {
          if (thisRsInfo.equals(rsInfo))
            found++;
        }
        // Not found
        if (found == 0)
          return false;
        // Should never see twice as rsInfo structure in a dsList
        assertFalse(found > 1);
      // Ok, found exactly once in the list, examine next structure
      }
      return true;
    }
    @Override
    public String toString()
    {
      String dsStr = "";
      final StringBuilder sb = new StringBuilder("TopoView:");
      sb.append("\n----------------------------\n");
      sb.append("CONNECTED DS SERVERS:\n");
      for (DSInfo dsInfo : dsList)
      {
        dsStr += dsInfo.toString() + "\n----------------------------\n";
        sb.append(dsInfo).append("\n----------------------------\n");
      }
      String rsStr = "";
      sb.append("CONNECTED RS SERVERS:\n");
      for (RSInfo rsInfo : rsList)
      {
        rsStr += rsInfo.toString() + "\n----------------------------\n";
        sb.append(rsInfo).append("\n----------------------------\n");
      }
      return sb.toString();
    }
      return ("TopoView:" +
        "\n----------------------------\n" + "CONNECTED DS SERVERS:\n" + dsStr +
        "CONNECTED RS SERVERS:\n" + rsStr);
    private TopologyViewTest getOuterType()
    {
      return TopologyViewTest.this;
    }
  }
  private String getHostPort(int port)
  {
    return LOCAL_HOST_NAME + ":" + port;
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -38,6 +38,7 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
@@ -56,6 +57,7 @@
import org.testng.annotations.Test;
import static java.util.Arrays.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -239,14 +241,12 @@
   * (no server state constructor version)
   */
  private FakeReplicationDomain createFakeReplicationDomain(int serverId,
    int groupId, int rsId, long generationId, boolean assured,
    AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
    int scenario)
        throws Exception
      int groupId, int rsId, long generationId, AssuredMode assuredMode,
      int safeDataLevel, long assuredTimeout, int scenario) throws Exception
  {
    ReplicationDomainCfg config = newFakeCfg(serverId, getRsPort(rsId), groupId);
    return createFakeReplicationDomain(config, groupId, rsId, generationId, assured,
      assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true);
    return createFakeReplicationDomain(serverId, groupId, rsId, generationId,
        assuredMode, safeDataLevel, assuredTimeout, scenario,
        new ServerState(), true);
  }
  private int getRsPort(int rsId)
@@ -284,17 +284,23 @@
   * @throws Exception
   */
  private FakeReplicationDomain createFakeReplicationDomain(
      ReplicationDomainCfg config, int groupId, int rsId, long generationId,
      boolean assured, AssuredMode assuredMode, int safeDataLevel,
      int serverId, int groupId, int rsId, long generationId,
      AssuredMode assuredMode, int safeDataLevel,
      long assuredTimeout, int scenario, ServerState serverState,
      boolean startListen) throws Exception
  {
    // Set port to right real RS according to its id
    int rsPort = getRsPort(rsId);
    final DomainFakeCfg config = newDomainConfig(serverId, groupId, rsId,
        assuredMode, safeDataLevel, assuredTimeout);
    return createFakeReplicationDomain(config, rsId, generationId, scenario,
        serverState, startListen);
  }
    FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
        config.getBaseDN(), config.getServerId(), generationId, (byte) groupId,
        assured, assuredMode, (byte) safeDataLevel, assuredTimeout, scenario, serverState);
  private FakeReplicationDomain createFakeReplicationDomain(
      ReplicationDomainCfg config, int rsId, long generationId, int scenario,
      ServerState serverState, boolean startListen) throws Exception
  {
    FakeReplicationDomain fakeReplicationDomain =
        new FakeReplicationDomain(config, generationId, scenario, serverState);
    fakeReplicationDomain.startPublishService(config);
    if (startListen)
@@ -304,20 +310,42 @@
    assertTrue(fakeReplicationDomain.isConnected());
    // Check connected server port
    HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
    assertEquals(rd.getPort(), rsPort);
    assertEquals(rd.getPort(), getRsPort(rsId));
    return fakeReplicationDomain;
  }
  private DomainFakeCfg newFakeCfg(int serverId, int rsPort, int groupId) throws Exception
  private DomainFakeCfg newDomainConfig(int serverId, int groupId, int rsId,
      AssuredMode assuredMode, int safeDataLevel, long assuredTimeout)
      throws DirectoryException
  {
    DomainFakeCfg fakeCfg = new DomainFakeCfg(
        DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort), groupId);
    final int rsPort = getRsPort(rsId);
    final DomainFakeCfg fakeCfg = new DomainFakeCfg(
        DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort),
        getAssuredType(assuredMode),
        safeDataLevel, groupId, assuredTimeout, new TreeSet<String>());
    fakeCfg.setHeartbeatInterval(1000);
    fakeCfg.setChangetimeHeartbeatInterval(500);
    return fakeCfg;
  }
  private AssuredType getAssuredType(AssuredMode assuredMode)
  {
    if (assuredMode == null)
    {
      return AssuredType.NOT_ASSURED;
    }
    switch (assuredMode)
    {
    case SAFE_READ_MODE:
      return AssuredType.SAFE_READ;
    case SAFE_DATA_MODE:
      return AssuredType.SAFE_DATA;
    }
    throw new RuntimeException("Not implemented for " + assuredMode);
  }
  /**
   * Creates and connects a new fake replication server, using the passed scenario.
   */
@@ -412,9 +440,8 @@
  {
    /** The scenario this DS is expecting */
    private int scenario = -1;
    private long generationId = -1;
    private CSNGenerator gen = null;
    private CSNGenerator gen;
    /** False if a received update had assured parameters not as expected */
    private boolean everyUpdatesAreOk = true;
@@ -437,28 +464,14 @@
     * behavior upon reception of updates)
     * @throws org.opends.server.config.ConfigException
     */
    public FakeReplicationDomain(
      DN baseDN,
      int serverID,
      long generationId,
      byte groupId,
      boolean assured,
      AssuredMode assuredMode,
      byte safeDataLevel,
      long assuredTimeout,
      int scenario,
      ServerState serverState) throws ConfigException
    public FakeReplicationDomain(ReplicationDomainCfg config,
        long generationId, int scenario, ServerState serverState)
        throws ConfigException
    {
      super(baseDN, serverID, serverState);
      this.generationId = generationId;
      setGroupId(groupId);
      setAssured(assured);
      setAssuredMode(assuredMode);
      setAssuredSdLevel(safeDataLevel);
      setAssuredTimeout(assuredTimeout);
      super(config, generationId, serverState);
      this.scenario = scenario;
      gen = new CSNGenerator(serverID, 0L);
      gen = new CSNGenerator(config.getServerId(), 0L);
    }
    public boolean receivedUpdatesOk()
@@ -490,12 +503,6 @@
    }
    @Override
    public long getGenerationID()
    {
      return generationId;
    }
    @Override
    protected void importBackend(InputStream input) throws DirectoryException
    {
      // Not needed for this test
@@ -549,8 +556,8 @@
        debugInfo("Fake DS " + getServerId() + " received update assured flag is wrong: " + updateMsg);
        ok = false;
      }
      if (updateMsg.getAssuredMode() !=  getAssuredMode())
      {
      if (isAssured() && updateMsg.getAssuredMode() != getAssuredMode())
      { // it is meaningless to have different assured mode when UpdateMsg is not assured
        debugInfo("Fake DS " + getServerId() + " received update assured mode is wrong: " + updateMsg);
        ok = false;
      }
@@ -645,7 +652,7 @@
     * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
     * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
     * @param groupId our group id
     * @param baseDN the basedn we connect with, to the real RS
     * @param baseDN the baseDN we connect with, to the real RS
     * @param generationId the generation id we use at connection to real RS
     */
    public FakeReplicationServer(int port, int serverId, boolean assured,
@@ -1022,9 +1029,7 @@
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, 0);
      assertNotNull(rs1);
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
      /*
       * Start main DS (the one which sends updates)
@@ -1033,8 +1038,7 @@
      // Create and connect fake domain 1 to RS 1
      // Assured mode: SD, level 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, mainDsGid, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      /*
       * Start one other fake DS
@@ -1049,8 +1053,7 @@
        // this would timeout. If main DS group id is not the same as the real RS one,
        // the update will even not come to real RS as assured
        fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
          TIMEOUT_DS_SCENARIO);
            DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      }
      /*
@@ -1363,9 +1366,7 @@
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, 0);
      assertNotNull(rs1);
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
      /*
       * Start main DS (the one which sends updates)
@@ -1373,8 +1374,7 @@
      // Create and connect fake domain 1 to RS 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      /*
       * Start one other fake DS
@@ -1384,8 +1384,7 @@
      if (otherFakeDS)
      {
        fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
          otherFakeDsGenId, false, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
          TIMEOUT_DS_SCENARIO);
            otherFakeDsGenId, null, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      }
      /*
@@ -1873,9 +1872,7 @@
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, 0);
      assertNotNull(rs1);
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
      /*
       * Start fake RS to make the RS have the default generation id
@@ -1976,20 +1973,13 @@
       */
      int numberOfRealRSs = 3;
      // Create real RS 1
      // Create real RS 1, 2, 3
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs1);
      // Create real RS 2
      rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs2);
      // Create real RS 3
      rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs3);
      /*
       * Start DS that will send updates
@@ -1998,8 +1988,7 @@
      // Wait for RSs to connect together
      // Create and connect fake domain 1 to RS 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // Wait for RSs connections to be finished
      // DS must see expected numbers of RSs
@@ -2049,9 +2038,7 @@
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, 0);
      assertNotNull(rs1);
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
      /*******************
       * Start main DS 1 (the one which sends updates)
@@ -2060,8 +2047,7 @@
      // Create and connect DS 1 to RS 1
      // Assured mode: SR
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      /*
       * Send a first assured safe read update
@@ -2089,10 +2075,9 @@
      // Create and connect DS 2 to RS 1
      // Assured mode: SR
      ServerState serverState = fakeRd1.getServerState();
      ReplicationDomainCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID);
      fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
              REPLY_OK_DS_SCENARIO, serverState, true);
      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
          REPLY_OK_DS_SCENARIO, serverState, true);
      // Wait for connections to be established
      waitForStableTopo(fakeRd1, 1, 1);
@@ -2306,31 +2291,26 @@
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, 0);
      assertNotNull(rs1);
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
      /*
       * Start main DS 1 (the one which sends updates)
       */
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      /*
       * Start another fake DS 2 connected to RS
       */
      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      /*
       * Start another fake DS 3 connected to RS
       */
      fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
        otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        otherFakeDsScen);
          otherFakeDsGenId, otherFakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null,
          1, LONG_TIMEOUT, otherFakeDsScen);
      /*
       * Start fake RS (RS 1) connected to RS
@@ -2617,25 +2597,17 @@
       */
      int numberOfRealRSs = 4;
      // Create real RS 1
      // Create real RS 1, 2, 3
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs1);
      // Create real RS 2
      rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs2);
      // Create real RS 3
      rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs3);
      // Create real RS 4 (different GID 2)
      rs4 = createReplicationServer(RS4_ID, OTHER_GID_BIS, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs4);
      /*
       * Start DS 1 that will send assured updates
@@ -2644,8 +2616,7 @@
      // Wait for RSs to connect together
      // Create and connect fake domain 1 to RS 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // Wait for connections to be finished
      // DS must see expected numbers of DSs/RSs
@@ -2674,23 +2645,19 @@
      // DS 2 connected to RS 1
      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      // DS 3 connected to RS 2
      fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, DEFAULT_GID, RS2_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      // DS 4 connected to RS 3
      fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, DEFAULT_GID, RS3_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      // DS 5 connected to RS 3
      fakeRDs[5] = createFakeReplicationDomain(FDS5_ID, DEFAULT_GID, RS3_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      /*
       * Start DSs that will not receive updates from DS 1 as assured because
@@ -2699,23 +2666,19 @@
      // DS 6 connected to RS 1
      fakeRDs[6] = createFakeReplicationDomain(FDS6_ID, OTHER_GID, RS1_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 7 connected to RS 2
      fakeRDs[7] = createFakeReplicationDomain(FDS7_ID, OTHER_GID, RS2_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 8 connected to RS 3
      fakeRDs[8] = createFakeReplicationDomain(FDS8_ID, OTHER_GID, RS3_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 9 (GID 2) connected to RS 4
      fakeRDs[9] = createFakeReplicationDomain(FDS9_ID, OTHER_GID_BIS, RS4_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      /*
       * Start DSs that will not receive updates from DS 1 because
@@ -2724,18 +2687,15 @@
      // DS 10 connected to RS 1
      fakeRDs[10] = createFakeReplicationDomain(FDS10_ID, DEFAULT_GID, RS1_ID,
        OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 11 connected to RS 2
      fakeRDs[11] = createFakeReplicationDomain(FDS11_ID, DEFAULT_GID, RS2_ID,
        OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 12 connected to RS 3
      fakeRDs[12] = createFakeReplicationDomain(FDS12_ID, DEFAULT_GID, RS3_ID,
        OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // Wait for connections to be finished
      // DS must see expected numbers of DSs/RSs
@@ -2897,15 +2857,11 @@
       */
      int numberOfRealRSs = 2;
      // Create real RS 1
      // Create real RS 1, 2
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs1);
      // Create real RS 2
      rs2 = createReplicationServer(RS2_ID, OTHER_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs2);
      /*
       * Start DSs with GID=DEFAULT_GID, connected to RS1
@@ -2913,13 +2869,11 @@
      // DS 1 connected to RS 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 2 connected to RS 1
      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      /*
       * Start DSs with GID=OTHER_GID, connected to RS2
@@ -2927,13 +2881,11 @@
      // DS 3 connected to RS 2
      fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, OTHER_GID, RS2_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      // DS 4 connected to RS 3
      fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, OTHER_GID, RS2_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
          DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
      // Wait for connections to be finished
      // DS must see expected numbers of DSs/RSs
@@ -3010,15 +2962,11 @@
       */
      int numberOfRealRSs = 2;
      // Create real RS 1
      // Create real RS 1, 2
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT + 1000, // Be sure DS2 timeout is seen from DS1
        testCase, numberOfRealRSs);
      assertNotNull(rs1);
      // Create real RS 2
      rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase, numberOfRealRSs);
      assertNotNull(rs2);
      /*
       * Start 2 fake DSs
@@ -3026,13 +2974,12 @@
      // DS 1 connected to RS 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 2 connected to RS 2
      fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID,
        fakeDsGenId, (fakeDsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen);
          fakeDsGenId, fakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null,
          1, LONG_TIMEOUT, fakeDsScen);
      // Wait for connections to be finished
      // DS must see expected numbers of DSs/RSs
@@ -3167,15 +3114,14 @@
      // DS 1 connected to RS 1
      fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
          DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
      // DS 2 connected to RS 1 with low window to easily put it in DEGRADED status
      DomainFakeCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID);
      final DomainFakeCfg config = newDomainConfig(FDS2_ID, DEFAULT_GID, RS1_ID,
          AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT);
      config.setWindowSize(2);
      fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO, new ServerState(), false);
      fakeRDs[2] = createFakeReplicationDomain(config, RS1_ID, DEFAULT_GENID,
          REPLY_OK_DS_SCENARIO, new ServerState(), false);
      // Wait for connections to be finished
      // DS must see expected numbers of DSs/RSs
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -65,19 +65,24 @@
  private int exportedEntryCount;
  private long generationID = 1;
  private FakeReplicationDomain(DN baseDN, int serverID,
      SortedSet<String> replicationServers, int window, long heartbeatInterval)
      throws ConfigException
  {
    super(baseDN, serverID, 100);
    super(newConfig(baseDN, serverID, replicationServers, window,
        heartbeatInterval), 1);
    startPublishService(getConfig());
    startListenService();
  }
  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
      SortedSet<String> replicationServers, int window, long heartbeatInterval)
  {
    DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
    fakeCfg.setHeartbeatInterval(heartbeatInterval);
    fakeCfg.setChangetimeHeartbeatInterval(500);
    fakeCfg.setWindowSize(window);
    startPublishService(fakeCfg);
    startListenService();
    return fakeCfg;
  }
  public FakeReplicationDomain(DN baseDN, int serverID,
@@ -122,12 +127,6 @@
  }
  @Override
  public long getGenerationID()
  {
    return generationID;
  }
  @Override
  protected void importBackend(InputStream input) throws DirectoryException
  {
    byte[] buffer = new byte[1000];
@@ -157,8 +156,4 @@
    return true;
  }
  public void setGenerationID(long newGenerationID)
  {
    generationID = newGenerationID;
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -58,14 +58,20 @@
      SortedSet<String> replicationServers, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(baseDN, serverID, 100);
    super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), 1);
    startPublishService(getConfig());
    startListenService();
    this.queue = queue;
  }
  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
      SortedSet<String> replicationServers, long heartbeatInterval)
  {
    final DomainFakeCfg fakeCfg =
        new DomainFakeCfg(baseDN, serverID, replicationServers);
    fakeCfg.setHeartbeatInterval(heartbeatInterval);
    fakeCfg.setChangetimeHeartbeatInterval(500);
    startPublishService(fakeCfg);
    startListenService();
    this.queue = queue;
    return fakeCfg;
  }
  private static final int IMPORT_SIZE = 100000000;
@@ -98,12 +104,6 @@
  }
  @Override
  public long getGenerationID()
  {
    return 1;
  }
  @Override
  protected void importBackend(InputStream input) throws DirectoryException
  {
    long startDate = System.currentTimeMillis();