mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.31.2013 2a1a1bec32261f04e304eea6c7a1f045a3bedba5
serviceId => baseDN (To make the code less confusing)
13 files modified
712 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ListenerThread.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 483 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java 30 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTask.java 47 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java 30 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java 59 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 19 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 9 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2331,7 +2331,7 @@
        {
          // This exception may only be raised if assured replication is
          // enabled
          Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
          Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
            Long.toString(getAssuredTimeout()), msg.toString());
          logError(errorMsg);
        }
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -848,14 +848,14 @@
   */
  public static Set<String> getECLDisabledDomains()
  {
    Set<String> disabledServiceIDs = new HashSet<String>();
    Set<String> disabledBaseDNs = new HashSet<String>();
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (!domain.isECLEnabled())
        disabledServiceIDs.add(domain.getBaseDN().toNormalizedString());
        disabledBaseDNs.add(domain.getBaseDN().toNormalizedString());
    }
    return disabledServiceIDs;
    return disabledBaseDNs;
  }
  /**
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -241,7 +241,7 @@
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() +
                " getNextEligibleMessageForDomain(" + opid+ ") "
                + " got new message : "
                +  " serviceId=[" + mh.getBaseDN()
                +  " baseDN=[" + mh.getBaseDN()
                + "] [newMsg=" + newMsg + "]" + dumpState());
          // in non blocking mode, return null when no more msg
opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -63,7 +63,8 @@
  public ListenerThread(ReplicationDomain repDomain)
  {
    super("Replica DS(" + repDomain.getServerId()
        + ") listener for domain \"" + repDomain.getServiceID()
 + ") listener for domain \""
        + repDomain.getBaseDNString()
        + "\"");
    this.repDomain = repDomain;
  }
@@ -145,7 +146,8 @@
        if (n >= FACTOR)
        {
          TRACER.debugInfo("Interrupting listener thread for dn "
              + repDomain.getServiceID() + " in DS " + repDomain.getServerId());
              + repDomain.getBaseDNString() + " in DS "
              + repDomain.getServerId());
          this.interrupt();
        }
      }
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -117,11 +117,11 @@
  private static final DebugTracer TRACER = getTracer();
  /**
   *  An identifier for the Replication Service.
   *  All Replication Domain using this identifier will be connected
   *  The baseDN for the Replication Service.
   *  All Replication Domain using this baseDN will be connected
   *  through the Replication Service.
   */
  private final String serviceID;
  private final String baseDN;
  /**
   * The identifier of this Replication Domain inside the
@@ -167,19 +167,22 @@
  /*
   * Assured mode properties
   */
  // Is assured mode enabled or not for this domain ?
  /** Whether assured mode is enabled for this domain. */
  private boolean assured = false;
  // Assured sub mode (used when assured is true)
  /** Assured sub mode (used when assured is true). */
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // Safe Data level (used when assuredMode is SAFE_DATA)
  private byte assuredSdLevel = (byte)1;
  // The timeout in ms that should be used, when waiting for assured acks
  /** Safe Data level (used when assuredMode is SAFE_DATA). */
  private byte assuredSdLevel = 1;
  /** The timeout in ms that should be used, when waiting for assured acks. */
  private long assuredTimeout = 2000;
  // Group id
  private byte groupId = (byte)1;
  // Referrals urls to be published to other servers of the topology
  // TODO: fill that with all currently opened urls if no urls configured
  /** Group id. */
  private byte groupId = 1;
  /**
   * Referrals urls to be published to other servers of the topology.
   * <p>
   * TODO: fill that with all currently opened urls if no urls configured
   */
  private final List<String> refUrls = new ArrayList<String>();
  /**
@@ -189,52 +192,77 @@
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
  /* Assured replication monitoring counters */
  /** Assured replication monitoring counters. */
  // Number of updates sent in Assured Mode, Safe Read
  /** Number of updates sent in Assured Mode, Safe Read. */
  private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Read, that have been
  // successfully acknowledged
  /**
   * Number of updates sent in Assured Mode, Safe Read, that have been
   * successfully acknowledged.
   */
  private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Read, that have not been
  // successfully acknowledged (either because of timeout, wrong status or error
  // at replay)
  /**
   * Number of updates sent in Assured Mode, Safe Read, that have not been
   * successfully acknowledged (either because of timeout, wrong status or error
   * at replay).
   */
  private AtomicInteger assuredSrNotAcknowledgedUpdates =
    new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Read, that have not been
  // successfully acknowledged because of timeout
  /**
   * Number of updates sent in Assured Mode, Safe Read, that have not been
   * successfully acknowledged because of timeout.
   */
  private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Read, that have not been
  // successfully acknowledged because of wrong status
  /**
   * Number of updates sent in Assured Mode, Safe Read, that have not been
   * successfully acknowledged because of wrong status.
   */
  private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Read, that have not been
  // successfully acknowledged because of replay error
  /**
   * Number of updates sent in Assured Mode, Safe Read, that have not been
   * successfully acknowledged because of replay error.
   */
  private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0);
  // Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
  // that have not been successfully acknowledged (either because of timeout,
  // wrong status or error at replay) for a particular server (DS or RS). String
  // format: <server id>:<number of failed updates>
  /**
   * Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
   * that have not been successfully acknowledged (either because of timeout,
   * wrong status or error at replay) for a particular server (DS or RS).
   * <p>
   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
   */
  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Integer,Integer>();
  // Number of updates received in Assured Mode, Safe Read request
  /** Number of updates received in Assured Mode, Safe Read request. */
  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
  // acked without errors
  /**
   * Number of updates received in Assured Mode, Safe Read request that we have
   * acked without errors.
   */
  private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
  // acked with errors
  /**
   * Number of updates received in Assured Mode, Safe Read request that we have
   * acked with errors.
   */
  private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data
  /** Number of updates sent in Assured Mode, Safe Data. */
  private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data, that have been
  // successfully acknowledged
  /**
   * Number of updates sent in Assured Mode, Safe Data, that have been
   * successfully acknowledged.
   */
  private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data, that have not been
  // successfully acknowledged because of timeout
  /**
   * Number of updates sent in Assured Mode, Safe Data, that have not been
   * successfully acknowledged because of timeout.
   */
  private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0);
  // Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
  // that have not been successfully acknowledged because of timeout for a
  // particular RS. String format: <server id>:<number of failed updates>
  /**
   * Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
   * that have not been successfully acknowledged because of timeout for a
   * particular RS.
   * <p>
   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
   */
  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
@@ -299,7 +327,7 @@
  /**
   * Creates a ReplicationDomain with the provided parameters.
   *
   * @param serviceID  The identifier of the Replication Domain to which
   * @param baseDN     The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
@@ -307,42 +335,22 @@
   *                   is participating to a given Replication Domain.
   * @param initWindow Window used during initialization.
   */
  public ReplicationDomain(String serviceID, int serverID,int initWindow)
  public ReplicationDomain(String baseDN, int serverID,int initWindow)
  {
    this.serviceID = serviceID;
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.initWindow = initWindow;
    this.state = new ServerState();
    this.generator = new ChangeNumberGenerator(serverID, state);
    domains.put(serviceID, this);
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   *
   * @param serviceID  The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   */
  public ReplicationDomain(String serviceID, int serverID)
  {
    this.serviceID = serviceID;
    this.serverID = serverID;
    this.state = new ServerState();
    this.generator = new ChangeNumberGenerator(serverID, state);
    domains.put(serviceID, this);
    domains.put(baseDN, this);
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   * (for unit test purpose only)
   *
   * @param serviceID  The identifier of the Replication Domain to which
   * @param baseDN     The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
@@ -350,15 +358,15 @@
   *                   is participating to a given Replication Domain.
   * @param serverState The serverState to use
   */
  public ReplicationDomain(String serviceID, int serverID,
  public ReplicationDomain(String baseDN, int serverID,
    ServerState serverState)
  {
    this.serviceID = serviceID;
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.state = serverState;
    this.generator = new ChangeNumberGenerator(serverID, state);
    domains.put(serviceID, this);
    domains.put(baseDN, this);
  }
  /**
@@ -389,7 +397,7 @@
    if (!isValidInitialStatus(initStatus))
    {
      Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
        serviceID, Integer.toString(serverID));
        baseDN, Integer.toString(serverID));
      logError(msg);
    } else
    {
@@ -408,7 +416,7 @@
  private void receiveChangeStatus(ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + serviceID +
      TRACER.debugInfo("Replication domain " + baseDN +
        " received change status message:\n" + csMsg);
    ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -418,7 +426,7 @@
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
        serviceID, Integer.toString(serverID));
        baseDN, Integer.toString(serverID));
      logError(msg);
      return;
    }
@@ -474,13 +482,13 @@
  }
  /**
   * Gets the identifier of this domain.
   * Gets the baseDN of this domain.
   *
   * @return The identifier for this domain.
   * @return The baseDN for this domain.
   */
  public String getServiceID()
  public String getBaseDNString()
  {
    return serviceID;
    return baseDN;
  }
  /**
@@ -631,8 +639,7 @@
  {
    if (numProcessedUpdates != null)
      return numProcessedUpdates.get();
    else
      return 0;
    return 0;
  }
  /**
@@ -644,8 +651,7 @@
  {
    if (numRcvdUpdates != null)
      return numRcvdUpdates.get();
    else
      return 0;
    return 0;
  }
  /**
@@ -657,8 +663,7 @@
  {
    if (numSentUpdates != null)
      return numSentUpdates.get();
    else
      return 0;
    return 0;
  }
  /**
@@ -745,9 +750,10 @@
          return null;
        }
        if (debugEnabled())
          if (!(msg instanceof HeartbeatMsg))
            TRACER.debugVerbose("Message received <" + msg + ">");
        if (debugEnabled() && !(msg instanceof HeartbeatMsg))
        {
          TRACER.debugVerbose("Message received <" + msg + ">");
        }
        if (msg instanceof AckMsg)
        {
@@ -791,7 +797,7 @@
            if (debugEnabled())
              TRACER.debugInfo(
                  "[IE] processErrorMsg:" + this.serverID +
                  " serviceID: " + this.serviceID +
                  " baseDN: " + this.baseDN +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -862,8 +868,9 @@
    numRcvdUpdates.incrementAndGet();
     byte rsGroupId = broker.getRsGroupId();
    if ( update.isAssured() && (update.getAssuredMode() ==
      AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) )
    if (update.isAssured()
        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
        && rsGroupId == groupId)
    {
      assuredSrReceivedUpdates.incrementAndGet();
    }
@@ -937,7 +944,7 @@
        requested servers. Log problem
        */
        Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
            serviceID, Integer.toString(serverID),
            baseDN, Integer.toString(serverID),
            update.toString(), ack.errorsToString());
        logError(errorMsg);
@@ -1012,8 +1019,8 @@
   */
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will be initialized
    private final int serverToInitialize;
    /** Id of server that will be initialized. */
    private final int serverIdToInitialize;
    private final int initWindow;
@@ -1021,17 +1028,17 @@
    /**
     * Constructor for the ExportThread.
     *
     * @param serverToInitialize
     * @param serverIdToInitialize
     *          serverId of server that will receive entries
     * @param initWindow
     *          The value of the initialization window for flow control between
     *          the importer and the exporter.
     */
    public ExportThread(int serverToInitialize, int initWindow)
    public ExportThread(int serverIdToInitialize, int initWindow)
    {
      super("Export thread from serverId=" + serverID + " to serverId="
          + serverToInitialize);
      this.serverToInitialize = serverToInitialize;
          + serverIdToInitialize);
      this.serverIdToInitialize = serverIdToInitialize;
      this.initWindow = initWindow;
    }
@@ -1047,7 +1054,7 @@
        TRACER.debugInfo("[IE] starting " + this.getName());
      try
      {
        initializeRemote(serverToInitialize, serverToInitialize, null,
        initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
            initWindow);
      } catch (DirectoryException de)
      {
@@ -1069,61 +1076,75 @@
   */
  protected class IEContext
  {
    // The private task that initiated the operation.
    Task initializeTask;
    // The destination in the case of an export
    int exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
    int importSource = RoutableMsg.UNKNOWN_SERVER;
    /** The private task that initiated the operation. */
    private Task initializeTask;
    /** The destination in the case of an export. */
    private int exportTarget = RoutableMsg.UNKNOWN_SERVER;
    /** The source in the case of an import. */
    private int importSource = RoutableMsg.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    /** The total entry count expected to be processed. */
    private long entryCount = 0;
    /** The count for the entry not yet processed. */
    private long entryLeftCount = 0;
    // Exception raised during the initialization.
    DirectoryException exception = null;
    /** Exception raised during the initialization. */
    private DirectoryException exception = null;
    // Whether the context is related to an import or an export.
    boolean importInProgress;
    /** Whether the context is related to an import or an export. */
    private boolean importInProgress;
    // Current counter of messages exchanged during the initialization
    int msgCnt = 0;
    /** Current counter of messages exchanged during the initialization. */
    private int msgCnt = 0;
    // Number of connections lost when we start the initialization.
    // Will help counting connections lost during initialization,
    int initNumLostConnections = 0;
    /**
     * Number of connections lost when we start the initialization. Will help
     * counting connections lost during initialization,
     */
    private int initNumLostConnections = 0;
    // Request message sent when this server has the initializeFromRemote task.
    InitializeRequestMsg initReqMsgSent = null;
    /**
     * Request message sent when this server has the initializeFromRemote task.
     */
    private InitializeRequestMsg initReqMsgSent = null;
    // Start time of the initialization process. ErrorMsg timestamped
    // before thi startTime will be ignored.
    long startTime;
    /**
     * Start time of the initialization process. ErrorMsg timestamped before
     * this startTime will be ignored.
     */
    private long startTime;
    // List fo replicas (DS) connected to the topology when
    // initialization started.
    Set<Integer> startList = new HashSet<Integer>(0);
    /**
     * List for replicas (DS) connected to the topology when initialization
     * started.
     */
    private Set<Integer> startList = new HashSet<Integer>(0);
    // List fo replicas (DS) with a failure (disconnected from the topology)
    // since the initialization started.
    Set<Integer> failureList = new HashSet<Integer>(0);
    /**
     * List for replicas (DS) with a failure (disconnected from the topology)
     * since the initialization started.
     */
    private Set<Integer> failureList = new HashSet<Integer>(0);
    // Flow control during initialization
    // - for each remote server, counter of messages received
    /**
     * Flow control during initialization: for each remote server, counter of
     * messages received.
     */
    private final HashMap<Integer, Integer> ackVals =
      new HashMap<Integer, Integer>();
    // - serverId of the slowest server (the one with the smallest non null
    //   counter)
    /**
     * ServerId of the slowest server (the one with the smallest non null
     * counter).
     */
    private int slowestServerId = -1;
    short exporterProtocolVersion = -1;
    private short exporterProtocolVersion = -1;
    // Window used during this initialization
    int initWindow;
    /** Window used during this initialization. */
    private int initWindow;
    // Number of attempt already done for this initialization
    short attemptCnt;
    /** Number of attempt already done for this initialization. */
    private short attemptCnt;
    /**
     * Creates a new IEContext.
@@ -1137,7 +1158,6 @@
      this.importInProgress = importInProgress;
      this.startTime = System.currentTimeMillis();
      this.attemptCnt = 0;
    }
    /**
@@ -1368,7 +1388,7 @@
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
          countEntries(), serviceID, serverID);
          countEntries(), baseDN, serverID);
      logError(msg);
      for (DSInfo dsi : getReplicasList())
@@ -1384,7 +1404,7 @@
    else
    {
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
          countEntries(), serviceID, serverID, serverToInitialize);
          countEntries(), baseDN, serverID, serverToInitialize);
      logError(msg);
      ieContext.startList.add(serverToInitialize);
@@ -1392,8 +1412,8 @@
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
      {
        if (dsi.getDsId() == serverToInitialize)
         if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
          ieContext.setAckVal(dsi.getDsId(), 0);
      }
    }
@@ -1401,7 +1421,7 @@
    // loop for the case where the exporter is the initiator
    int attempt = 0;
    boolean done = false;
    while ((!done) && (++attempt<2)) // attempt loop
    while (!done && ++attempt < 2) // attempt loop
    {
      try
      {
@@ -1415,7 +1435,7 @@
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            serviceID, serverID, serverToInitialize, serverRunningTheTask,
            baseDN, serverID, serverToInitialize, serverRunningTheTask,
            ieContext.entryCount, initWindow);
        broker.publish(initTargetMsg);
@@ -1475,14 +1495,14 @@
              TRACER.debugInfo(
                "[IE] Exporter wait for reconnection by the listener thread");
            int att=0;
            while ((!broker.shuttingDown()) &&
                (!broker.isConnected())&& (++att<100))
            while (!broker.shuttingDown() && !broker.isConnected()
                && ++att < 100)
              try { Thread.sleep(100); }
              catch(Exception e){ /* do nothing */ }
          }
          if ((initTask != null) && broker.isConnected() &&
              (serverToInitialize != RoutableMsg.ALL_SERVERS))
          if (initTask != null && broker.isConnected()
              && serverToInitialize != RoutableMsg.ALL_SERVERS)
          {
            /*
            NewAttempt case : In the case where
@@ -1524,13 +1544,12 @@
    // Servers that left in the list are those for which we could not test
    // that they have been successfully initialized.
    if (!ieContext.failureList.isEmpty())
    if (!ieContext.failureList.isEmpty() && exportRootException == null)
    {
      if (exportRootException == null)
        exportRootException = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
              Long.toString(getGenerationID()),
              ieContext.failureList.toString()));
      exportRootException = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
                  Long.toString(getGenerationID()),
                  ieContext.failureList.toString()));
    }
    // Don't forget to release IEcontext acquired at beginning.
@@ -1541,22 +1560,21 @@
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL
          .get(serviceID, serverID, cause);
          .get(baseDN, serverID, cause);
      logError(msg);
    }
    else
    {
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
          serviceID, serverID, serverToInitialize, cause);
          baseDN, serverID, serverToInitialize, cause);
      logError(msg);
    }
    if (exportRootException != null)
    {
      throw(exportRootException);
      throw exportRootException;
    }
  }
  private String getReplicationMonitorInstanceName()
@@ -1564,10 +1582,10 @@
    return broker.getReplicationMonitor().getMonitorInstanceName();
  }
  /*
   * For all remote servers in tht start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
  /**
   * For all remote servers in the start list:
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteStartOfInit()
  {
@@ -1614,8 +1632,7 @@
        }
      }
    }
    while ((!done) && (waitResultAttempt<1200) // 2mn
        && (!broker.shuttingDown()));
    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
@@ -1624,17 +1641,15 @@
        "[IE] wait for start ends with " + ieContext.failureList);
  }
  /*
   * For all remote servers in the start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
  /**
   * For all remote servers in the start list:
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteEndOfInit()
  {
    Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
      replicasWeAreWaitingFor.add(sid);
    Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(
        ieContext.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1710,7 +1725,7 @@
        } // 1sec
    }
    while ((!done) && (!broker.shuttingDown())); // infinite wait
    while (!done && !broker.shuttingDown()); // infinite wait
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
@@ -1908,7 +1923,7 @@
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      this.serviceID,
                      this.baseDN,
                      Integer.toString(this.serverID),
                      Integer.toString(ieContext.importSource)));
            if (ieContext.getException()==null)
@@ -1994,7 +2009,7 @@
      we just abandon the export by throwing an exception.
      */
      if (ieContext.getException() != null)
        throw(new IOException(ieContext.getException().getMessage()));
        throw new IOException(ieContext.getException().getMessage());
      int slowestServerId = ieContext.getSlowestServer();
      if (isRemoteDSConnected(slowestServerId)==null)
@@ -2162,7 +2177,7 @@
    if (!broker.isConnected())
    {
      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString());
    }
    /*
@@ -2186,7 +2201,7 @@
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
          serviceID, serverID, source, this.initWindow);
          baseDN, serverID, source, this.initWindow);
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
@@ -2217,10 +2232,7 @@
      // No need to call here updateTaskCompletionState - will be done
      // by the caller
      releaseIEContext();
      DirectoryException de = new DirectoryException(
          ResultCode.OTHER,
          errMsg);
      throw (de);
      throw new DirectoryException(ResultCode.OTHER, errMsg);
    }
  }
@@ -2250,7 +2262,7 @@
    {
      // Log starting
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
          serviceID, initTargetMsgReceived.getSenderID(), serverID);
          baseDN, initTargetMsgReceived.getSenderID(), serverID);
      logError(msg);
      // Go into full update status
@@ -2305,11 +2317,11 @@
      */
      broker.reStart(false);
      if (ieContext.getException() != null)
      if (ieContext.getException() != null
          && broker.isConnected()
          && initFromTask != null
          && ++ieContext.attemptCnt < 2)
      {
        if (broker.isConnected() && (initFromTask != null)
            && (++ieContext.attemptCnt<2))
        {
          /*
          Worth a new attempt
          since initFromTask is in this server, connection is ok
@@ -2343,13 +2355,12 @@
          {
            /*
            An error occurs when sending a new request for a new import.
            This error is not stored, prefering to keep the initial one.
            This error is not stored, preferring to keep the initial one.
            */
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
          }
        }
      }
      // ===================
@@ -2364,7 +2375,7 @@
      try
      {
        if (broker.isConnected() && (ieContext.getException() != null))
        if (broker.isConnected() && ieContext.getException() != null)
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
@@ -2385,7 +2396,7 @@
      finally
      {
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            serviceID, initTargetMsgReceived.getSenderID(), serverID,
            baseDN, initTargetMsgReceived.getSenderID(), serverID,
            (ieContext.getException() != null ? ieContext
                .getException().getLocalizedMessage() : ""));
        logError(msg);
@@ -2426,7 +2437,7 @@
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(serviceID,
      Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDN,
          Integer.toString(serverID), status.toString(), event.toString());
      logError(msg);
      return;
@@ -2444,8 +2455,8 @@
      status = newStatus;
      if (debugEnabled())
        TRACER.debugInfo("Replication domain " + serviceID +
          " new status is: " + status);
        TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
            + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
@@ -2461,7 +2472,7 @@
   */
  public boolean ieRunning()
  {
    return (ieContext != null);
    return ieContext != null;
  }
  /**
@@ -2483,8 +2494,8 @@
      for (RSInfo rsInfo : getRsList())
      {
        // the 'empty' RSes (generationId==-1) are considered as good citizens
        if ((rsInfo.getGenerationId() != -1) &&
            (rsInfo.getGenerationId() != generationID))
        if (rsInfo.getGenerationId() != -1 &&
            rsInfo.getGenerationId() != generationID)
        {
          try
          {
@@ -2505,7 +2516,7 @@
    if (!allSet)
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(baseDN);
      throw new DirectoryException(
          resultCode, message);
    }
@@ -2536,7 +2547,7 @@
    // wait for the domain to reconnect.
    int count = 0;
    while (!isConnected() && (count < 10))
    while (!isConnected() && count < 10)
    {
      try
      {
@@ -2565,8 +2576,7 @@
  throws DirectoryException
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "Server id " + serverID + " and domain " + serviceID
      TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
          + " resetGenerationId " + generationIdNewValue);
    ResetGenerationIdMsg genIdMessage;
@@ -2583,7 +2593,7 @@
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID,
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(baseDN,
          Integer.toString(serverID),
          Long.toString(genIdMessage.getGenerationId()));
      throw new DirectoryException(
@@ -2620,8 +2630,7 @@
  {
    if (broker != null)
      return broker.getMaxRcvWindow();
    else
      return 0;
    return 0;
  }
  /**
@@ -2633,8 +2642,7 @@
  {
    if (broker != null)
      return broker.getCurrentRcvWindow();
    else
      return 0;
    return 0;
  }
  /**
@@ -2646,8 +2654,7 @@
  {
    if (broker != null)
      return broker.getMaxSendWindow();
    else
      return 0;
    return 0;
  }
  /**
@@ -2659,8 +2666,7 @@
  {
    if (broker != null)
      return broker.getCurrentSendWindow();
    else
      return 0;
    return 0;
  }
  /**
@@ -2671,8 +2677,7 @@
  {
    if (broker != null)
      return broker.getNumLostConnections();
    else
      return 0;
    return 0;
  }
  /**
@@ -2736,8 +2741,7 @@
  {
    if (broker != null)
      return broker.getReplicationServer();
    else
      return ReplicationBroker.NO_CONNECTED_SERVER;
    return ReplicationBroker.NO_CONNECTED_SERVER;
  }
  /**
@@ -2965,11 +2969,9 @@
    {
      if (broker == null)
      {
        /*
         * create the broker object used to publish and receive changes
         */
        // create the broker object used to publish and receive changes
        broker = new ReplicationBroker(
            this, state, serviceID,
            this, state, baseDN,
            serverID, window,
            getGenerationID(),
            heartbeatInterval,
@@ -2996,7 +2998,6 @@
  {
    synchronized (sessionLock)
    {
      //
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      listenerThread.start();
@@ -3066,7 +3067,7 @@
  public void stopDomain()
  {
    disableService();
    domains.remove(serviceID);
    domains.remove(baseDN);
  }
  /**
@@ -3087,14 +3088,12 @@
  {
    this.groupId = groupId;
    if (broker != null)
    if (broker != null
        && broker.changeConfig(replicationServers, windowSize,
            heartbeatInterval, groupId))
    {
      if (broker.changeConfig(
          replicationServers, windowSize, heartbeatInterval, groupId))
      {
        disableService();
        enableService();
      }
      disableService();
      enableService();
    }
  }
@@ -3112,14 +3111,11 @@
  public void changeConfig(Set<String> includeAttributes,
      Set<String> includeAttributesForDeletes)
  {
    if (setEclIncludes(serverID, includeAttributes,
        includeAttributesForDeletes))
    if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
        && broker != null)
    {
      if (broker != null)
      {
        disableService();
        enableService();
      }
      disableService();
      enableService();
    }
  }
@@ -3239,7 +3235,7 @@
        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        {
          Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
              Integer.toString(serverID), msgAssuredMode.toString(), serviceID,
              Integer.toString(serverID), msgAssuredMode.toString(), baseDN,
            msg.toString());
          logError(errorMsg);
        }
@@ -3283,17 +3279,14 @@
     * geographical location). If we are connected to a RS which is not in our
     * locality, no need to ask for an ack.
     */
    if (assured && (rsGroupId == groupId))
    if (assured && rsGroupId == groupId)
    {
      msg.setAssured(true);
      msg.setAssuredMode(assuredMode);
      if (assuredMode == AssuredMode.SAFE_DATA_MODE)
        msg.setSafeDataLevel(assuredSdLevel);
      /*
      Add the assured message to the list of update that are
      waiting for acks
      */
      // Add the assured message to the list of update that are waiting for acks
      waitingAckMsgs.put(msg.getChangeNumber(), msg);
    }
  }
@@ -3315,7 +3308,7 @@
    // If assured mode configured, wait for acknowledgement for the just sent
    // message
    if (assured && (rsGroupId == groupId))
    if (assured && rsGroupId == groupId)
    {
      // Increment assured replication monitoring counters
      switch (assuredMode)
@@ -3354,7 +3347,7 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "serviceID: " + serviceID);
              "baseDN: " + baseDN);
          }
          break;
        }
@@ -3394,7 +3387,7 @@
            }
            throw new TimeoutException("No ack received for message cn: " + cn +
              " and replication servceID: " + serviceID + " after " +
              " and replication servceID: " + baseDN + " after " +
              assuredTimeout + " ms.");
          } else
          {
@@ -3458,7 +3451,7 @@
    {
      // This exception may only be raised if assured replication is
      // enabled
      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(baseDN, Long.toString(
        assuredTimeout), update.toString());
      logError(errorMsg);
    }
@@ -3521,8 +3514,7 @@
  {
    if (ieContext != null)
      return ieContext.entryLeftCount;
    else
      return 0;
    return 0;
  }
  /**
@@ -3548,8 +3540,7 @@
  {
    if (ieContext != null)
      return ieContext.entryCount;
    else
      return 0;
    return 0;
  }
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -27,19 +27,14 @@
 */
package org.opends.server.replication.service;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.Attributes;
import org.opends.server.types.*;
/**
 * Class used to generate monitoring information for the replication.
@@ -77,7 +72,7 @@
  {
    return "Directory server DS(" + domain.getServerId() + ") "
        + domain.getLocalUrl() + ",cn="
        + domain.getServiceID().replace(',', '_').replace('=', '_')
        + domain.getBaseDNString().replace(',', '_').replace('=', '_')
        + ",cn=Replication";
  }
@@ -95,7 +90,7 @@
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    /* get the base dn */
    Attribute attr = Attributes.create("domain-name", domain.getServiceID());
    Attribute attr = Attributes.create("domain-name", domain.getBaseDNString());
    attributes.add(attr);
    /* get the base dn */
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -23,32 +23,24 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.tasks;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.messages.Message;
import org.opends.messages.TaskMessages;
import org.opends.server.types.ResultCode;
import org.opends.messages.MessageBuilder;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import java.util.List;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.TaskMessages;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.types.*;
/**
 * This class provides an implementation of a Directory Server task that can
@@ -70,6 +62,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public Message getDisplayName() {
    return TaskMessages.INFO_TASK_INITIALIZE_TARGET_NAME.get();
  }
@@ -123,11 +116,12 @@
  /**
   * {@inheritDoc}
   */
  @Override
  protected TaskState runTask()
  {
    if (debugEnabled())
      TRACER.debugInfo("[IE] InitializeTargetTask is starting on domain: "+
          domain.getServiceID());
      TRACER.debugInfo("[IE] InitializeTargetTask is starting on domain: "
          + domain.getBaseDNString());
    try
    {
opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -23,33 +23,24 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.tasks;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.types.ResultCode;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Message;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.List;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.TaskMessages;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.types.*;
/**
 * This class provides an implementation of a Directory Server task that can
@@ -63,24 +54,29 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private String  domainString            = null;
  private String domainString;
  private int  source;
  private LDAPReplicationDomain domain        = null;
  private LDAPReplicationDomain domain;
  private TaskState initState;
  // The total number of entries expected to be processed when this import
  // will end successfully
  long total = 0;
  /**
   * The total number of entries expected to be processed when this import will
   * end successfully.
   */
  private long total = 0;
  // The number of entries still to be processed for this import to be
  // completed
  long left = 0;
  /**
   * The number of entries still to be processed for this import to be
   * completed.
   */
  private long left = 0;
  private Message taskCompletionError = null;
  /**
   * {@inheritDoc}
   */
  @Override
  public Message getDisplayName() {
    return TaskMessages.INFO_TASK_INITIALIZE_NAME.get();
  }
@@ -135,12 +131,13 @@
  /**
   * {@inheritDoc}
   */
  @Override
  protected TaskState runTask()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("[IE] InitializeTask is starting on domain: %s "
          + " from source:%d", domain.getServiceID(), source);
          + " from source:%d", domain.getBaseDNString(), source);
    }
    initState = getTaskState();
    try
opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
@@ -23,36 +23,24 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.tasks;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.types.ResultCode;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Message;
import org.opends.messages.Category;
import org.opends.messages.Severity;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.List;
import org.opends.messages.TaskMessages;
import org.opends.messages.*;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
/**
 * This class provides an implementation of a Directory Server task that can
@@ -182,7 +170,7 @@
    if (debugEnabled())
    {
      debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting "
          + "on domain: " + domain.getServiceID()
          + "on domain: " + domain.getBaseDNString()
          + "max duration (sec):" + purgeTaskMaxDurationInSec);
    }
    try
opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java
@@ -23,30 +23,25 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.tasks;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.service.ReplicationDomain;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.List;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.TaskMessages;
import org.opends.messages.Message;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.*;
/**
 * This class provides an implementation of a Directory Server task that can
@@ -63,18 +58,10 @@
  private ReplicationDomain domain        = null;
  private Long generationId = null;
  private static final void debugInfo(String s)
  {
    if (debugEnabled())
    {
      // System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s));
      TRACER.debugInfo(s);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public Message getDisplayName() {
    return TaskMessages.INFO_TASK_SET_GENERATION_ID_NAME.get();
  }
@@ -82,10 +69,9 @@
  /**
   * {@inheritDoc}
   */
  @Override public void initializeTask() throws DirectoryException
  @Override
  public void initializeTask() throws DirectoryException
  {
    List<Attribute> attrList;
    if (TaskState.isDone(getTaskState()))
    {
      return;
@@ -95,15 +81,14 @@
    Entry taskEntry = getTaskEntry();
    // Retrieves the eventual generation-ID
    AttributeType typeNewValue;
    typeNewValue =
    AttributeType typeNewValue =
      getAttributeType(ATTR_TASK_SET_GENERATION_ID_NEW_VALUE, true);
    attrList = taskEntry.getAttribute(typeNewValue);
    List<Attribute> attrList = taskEntry.getAttribute(typeNewValue);
    if ((attrList != null) && !attrList.isEmpty())
    {
      try
      {
        generationId = new Long(TaskUtils.getSingleValueString(attrList));
        generationId = Long.parseLong(TaskUtils.getSingleValueString(attrList));
      }
      catch(Exception e)
      {
@@ -116,8 +101,7 @@
    }
    // Retrieves the replication domain
    AttributeType typeDomainBase;
    typeDomainBase =
    AttributeType typeDomainBase =
      getAttributeType(ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN, true);
    attrList = taskEntry.getAttribute(typeDomainBase);
@@ -140,10 +124,14 @@
  /**
   * {@inheritDoc}
   */
  @Override
  protected TaskState runTask()
  {
    debugInfo("setGenerationIdTask is starting on domain%s" +
        domain.getServiceID());
    if (debugEnabled())
    {
      TRACER.debugInfo("setGenerationIdTask is starting on domain %s"
              + domain.getBaseDNString());
    }
    try
    {
@@ -155,7 +143,10 @@
      return TaskState.STOPPED_BY_ERROR;
    }
    debugInfo("setGenerationIdTask is ending SUCCESSFULLY");
    if (debugEnabled())
    {
      TRACER.debugInfo("setGenerationIdTask is ending SUCCESSFULLY");
    }
    return TaskState.COMPLETED_SUCCESSFULLY;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -27,6 +27,11 @@
 */
package org.opends.server.replication.server;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
@@ -56,11 +61,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
/**
 * Test Server part of the assured feature in both safe data and
 * safe read modes.
@@ -595,7 +595,7 @@
    /**
     * Creates a fake replication domain (DS)
     * @param serviceID The base dn used at connection to RS
     * @param baseDN The base dn used at connection to RS
     * @param serverID our server id
     * @param generationId the generation id we use at connection to real RS
     * @param groupId our group id
@@ -608,7 +608,7 @@
     * @throws org.opends.server.config.ConfigException
     */
    public FakeReplicationDomain(
      String serviceID,
      String baseDN,
      int serverID,
      long generationId,
      byte groupId,
@@ -619,7 +619,7 @@
      int scenario,
      ServerState serverState) throws ConfigException
    {
      super(serviceID, serverID, serverState);
      super(baseDN, serverID, serverState);
      this.generationId = generationId;
      setGroupId(groupId);
      setAssured(assured);
@@ -769,7 +769,8 @@
    public void sendNewFakeUpdate(boolean useAssured) throws TimeoutException
    {
      // Create a new delete update message (the simplest to create)
      DeleteMsg delMsg = new DeleteMsg(getServiceID(), gen.newChangeNumber(),
      DeleteMsg delMsg =
          new DeleteMsg(getBaseDNString(), gen.newChangeNumber(),
        UUID.randomUUID().toString());
      // Send it (this uses the defined assured conf at constructor time)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -68,21 +68,21 @@
  private long generationID = 1;
  public FakeReplicationDomain(
      String serviceID,
      String baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(serviceID, serverID, 100);
    super(baseDN, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.queue = queue;
  }
  public FakeReplicationDomain(
      String serviceID,
      String baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
@@ -91,7 +91,7 @@
      StringBuilder importString,
      int exportedEntryCount) throws ConfigException
  {
    super(serviceID, serverID, 100);
    super(baseDN, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.exportString = exportString;
@@ -119,7 +119,6 @@
      throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
          ERR_BACKEND_EXPORT_ENTRY.get("", ""));
    }
  }
  @Override
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -55,14 +55,14 @@
  private BlockingQueue<UpdateMsg> queue = null;
  public FakeStressReplicationDomain(
      String serviceID,
      String baseDN,
      int serverID,
      Collection<String> replicationServers,
      int window,
      long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(serviceID, serverID, 100);
    super(baseDN, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.queue = queue;