mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.30.2014 e33ecc4dd66d4c514cd7ad52d348a0be63a7f1eb
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -146,14 +146,14 @@
   * The context related to an import or export being processed
   * Null when none is being processed.
   */
  protected IEContext ieContext = null;
  volatile IEContext ieContext;
  /**
   * The Thread waiting for incoming update messages for this domain and pushing
   * them to the global incoming update message queue for later processing by
   * replay threads.
   */
  private volatile DirectoryThread listenerThread = null;
  private volatile DirectoryThread listenerThread;
  /**
   * A set of counters used for Monitoring.
@@ -732,7 +732,8 @@
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg)msg;
          if (ieContext != null)
          IEContext ieCtx = ieContext;
          if (ieCtx != null)
          {
            /*
            This is an error termination for the 2 following cases :
@@ -750,10 +751,10 @@
                  " baseDN: " + getBaseDN() +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
            if (errorMsg.getCreationTime() > ieCtx.startTime)
            {
              // consider only ErrorMsg that relate to the current import/export
              processErrorMsg(errorMsg);
              processErrorMsg(errorMsg, ieCtx);
            }
            else
            {
@@ -784,10 +785,11 @@
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
          if (ieContext != null)
          IEContext ieCtx = ieContext;
          if (ieCtx != null)
          {
            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
            ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
            ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
          }
          // Trash this msg When no input/export is running/should never happen
        }
@@ -1044,7 +1046,7 @@
    private long entryLeftCount = 0;
    /** Exception raised during the initialization. */
    private DirectoryException exception = null;
    private DirectoryException exception;
    /** Whether the context is related to an import or an export. */
    private boolean importInProgress;
@@ -1061,7 +1063,7 @@
    /**
     * Request message sent when this server has the initializeFromRemote task.
     */
    private InitializeRequestMsg initReqMsgSent = null;
    private InitializeRequestMsg initReqMsgSent;
    /**
     * Start time of the initialization process. ErrorMsg timestamped before
@@ -1116,12 +1118,47 @@
    }
    /**
     * Returns a boolean indicating if a total update import is currently in
     * Progress.
     *
     * @return A boolean indicating if a total update import is currently in
     *         Progress.
     */
    public boolean importInProgress()
    {
      return importInProgress;
    }
    /**
     * Returns the total number of entries to be processed when a total update
     * is in progress.
     *
     * @return The total number of entries to be processed when a total update
     *         is in progress.
     */
    long getTotalEntryCount()
    {
      return entryCount;
    }
    /**
     * Returns the number of entries still to be processed when a total update
     * is in progress.
     *
     * @return The number of entries still to be processed when a total update
     *         is in progress.
     */
    long getLeftEntryCount()
    {
      return entryLeftCount;
    }
    /**
     * Initializes the import/export counters with the provider value.
     * @param total Total number of entries to be processed.
     * @throws DirectoryException if an error occurred.
     */
    private void initializeCounters(long total)
      throws DirectoryException
    private void initializeCounters(long total) throws DirectoryException
    {
      entryCount = total;
      entryLeftCount = total;
@@ -1150,8 +1187,7 @@
     *
     * @throws DirectoryException if an error occurred.
     */
    public void updateCounters(int entriesDone)
      throws DirectoryException
    public void updateCounters(int entriesDone) throws DirectoryException
    {
      entryLeftCount -= entriesDone;
@@ -1358,6 +1394,7 @@
    - to update the task with the server(s) where this test failed
    */
    IEContext ieCtx = ieContext;
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
@@ -1365,7 +1402,7 @@
      for (DSInfo dsi : getReplicasList())
      {
        ieContext.startList.add(dsi.getDsId());
        ieCtx.startList.add(dsi.getDsId());
      }
      // We manage the list of servers with which a flow control can be enabled
@@ -1373,7 +1410,7 @@
      {
        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieContext.setAckVal(dsi.getDsId(), 0);
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
@@ -1382,7 +1419,7 @@
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
          getBaseDNString(), getServerId(), serverToInitialize));
      ieContext.startList.add(serverToInitialize);
      ieCtx.startList.add(serverToInitialize);
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
@@ -1390,7 +1427,7 @@
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieContext.setAckVal(dsi.getDsId(), 0);
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
@@ -1402,34 +1439,34 @@
    {
      try
      {
        ieContext.exportTarget = serverToInitialize;
        ieCtx.exportTarget = serverToInitialize;
        if (initTask != null)
        {
          ieContext.initializeTask = initTask;
          ieCtx.initializeTask = initTask;
        }
        ieContext.initializeCounters(this.countEntries());
        ieContext.msgCnt = 0;
        ieContext.initNumLostConnections = broker.getNumLostConnections();
        ieContext.initWindow = initWindow;
        ieCtx.initializeCounters(this.countEntries());
        ieCtx.msgCnt = 0;
        ieCtx.initNumLostConnections = broker.getNumLostConnections();
        ieCtx.initWindow = initWindow;
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            getBaseDN(), getServerId(), serverToInitialize,
            serverRunningTheTask, ieContext.entryCount, initWindow);
            serverRunningTheTask, ieCtx.entryCount, initWindow);
        broker.publish(initTargetMsg);
        // Wait for all servers to be ok
        waitForRemoteStartOfInit();
        waitForRemoteStartOfInit(ieCtx);
        // Servers that left in the list are those for which we could not test
        // that they have been successfully initialized.
        if (!ieContext.failureList.isEmpty())
        if (!ieCtx.failureList.isEmpty())
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
                  ieContext.failureList.toString()));
                  ieCtx.failureList.toString()));
        }
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
@@ -1441,7 +1478,7 @@
      catch(DirectoryException exportException)
      {
        // Give priority to the first exception raised - stored in the context
        final DirectoryException ieEx = ieContext.exception;
        final DirectoryException ieEx = ieCtx.exception;
        exportRootException = ieEx != null ? ieEx : exportException;
      }
@@ -1500,10 +1537,8 @@
            continue;
          }
          ErrorMsg errorMsg =
              new ErrorMsg(serverToInitialize,
                  exportRootException.getMessageObject());
          broker.publish(errorMsg);
          broker.publish(new ErrorMsg(
              serverToInitialize, exportRootException.getMessageObject()));
        }
        catch(Exception e)
        {
@@ -1518,16 +1553,16 @@
    } // attempt loop
    // Wait for all servers to be ok, and build the failure list
    waitForRemoteEndOfInit();
    waitForRemoteEndOfInit(ieCtx);
    // Servers that left in the list are those for which we could not test
    // that they have been successfully initialized.
    if (!ieContext.failureList.isEmpty() && exportRootException == null)
    if (!ieCtx.failureList.isEmpty() && exportRootException == null)
    {
      exportRootException = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
                  Long.toString(getGenerationID()),
                  ieContext.failureList.toString()));
                  ieCtx.failureList.toString()));
    }
    // Don't forget to release IEcontext acquired at beginning.
@@ -1558,10 +1593,10 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteStartOfInit()
  private void waitForRemoteStartOfInit(IEContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieContext.startList);
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1580,7 +1615,7 @@
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + getGenerationID());
        if (ieContext.startList.contains(dsi.getDsId()))
        if (ieCtx.startList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
          {
@@ -1604,11 +1639,11 @@
    }
    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for start ends with " + ieContext.failureList);
        "[IE] wait for start ends with " + ieCtx.failureList);
  }
  /**
@@ -1616,10 +1651,10 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteEndOfInit()
  private void waitForRemoteEndOfInit(IEContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieContext.startList);
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1645,7 +1680,7 @@
      while (it.hasNext())
      {
        int serverId = it.next();
        if (ieContext.failureList.contains(serverId))
        if (ieCtx.failureList.contains(serverId))
        {
          /*
          this server has already been in error during initialization
@@ -1701,11 +1736,11 @@
    }
    while (!done && !broker.shuttingDown()); // infinite wait
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end ends with " + ieContext.failureList);
        "[IE] wait for end ends with " + ieCtx.failureList);
  }
  /**
@@ -1743,13 +1778,13 @@
   *
   * @param errorMsg The error message received.
   */
  private void processErrorMsg(ErrorMsg errorMsg)
  private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
  {
    //Exporting must not be stopped on the first error, if we run initialize-all
    if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
    {
      // The ErrorMsg is received while we have started an initialization
      ieContext.setExceptionIfNoneSet(new DirectoryException(
      ieCtx.setExceptionIfNoneSet(new DirectoryException(
          ResultCode.OTHER, errorMsg.getDetails()));
      /*
@@ -1759,11 +1794,11 @@
       *   even after the nextInitAttemptDelay
       * During the import, the ErrorMsg will be received by receiveEntryBytes
       */
      if (ieContext.initializeTask instanceof InitializeTask)
      if (ieCtx.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask) ieContext.initializeTask)
            .updateTaskCompletionState(ieContext.getException());
        ((InitializeTask) ieCtx.initializeTask)
            .updateTaskCompletionState(ieCtx.getException());
        releaseIEContext();
      }
@@ -1781,6 +1816,7 @@
    ReplicationMsg msg;
    while (true)
    {
      IEContext ieCtx = ieContext;
      try
      {
        // In the context of the total update, we don't want any automatic
@@ -1807,7 +1843,7 @@
          else
          {
            // Handle connection issues
            ieContext.setExceptionIfNoneSet(new DirectoryException(
            ieCtx.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
                    .get(broker.getReplicationServer())));
            return null;
@@ -1819,26 +1855,26 @@
        {
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters(countEntryLimits(entryBytes));
          ieCtx.updateCounters(countEntryLimits(entryBytes));
          if (ieContext.exporterProtocolVersion >=
          if (ieCtx.exporterProtocolVersion >=
            ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // check the msgCnt of the msg received to check ordering
            if (++ieContext.msgCnt != entryMsg.getMsgId())
            if (++ieCtx.msgCnt != entryMsg.getMsgId())
            {
              ieContext.setExceptionIfNoneSet(new DirectoryException(
              ieCtx.setExceptionIfNoneSet(new DirectoryException(
                  ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
                      String.valueOf(ieContext.msgCnt),
                      String.valueOf(ieCtx.msgCnt),
                      String.valueOf(entryMsg.getMsgId()))));
              return null;
            }
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
            {
              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
              {
@@ -1864,12 +1900,12 @@
          This is an error termination during the import
          The error is stored and the import is ended by returning null
          */
          if (ieContext.getException() == null)
          if (ieCtx.getException() == null)
          {
            ErrorMsg errMsg = (ErrorMsg)msg;
            if (errMsg.getCreationTime() > ieContext.startTime)
            if (errMsg.getCreationTime() > ieCtx.startTime)
            {
              ieContext.setException(
              ieCtx.setException(
                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
              return null;
            }
@@ -1880,15 +1916,15 @@
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if (msg instanceof TopologyMsg
              && isRemoteDSConnected(ieContext.importSource) == null)
              && isRemoteDSConnected(ieCtx.importSource) == null)
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      getBaseDNString(),
                      Integer.toString(getServerId()),
                      Integer.toString(ieContext.importSource)));
            ieContext.setExceptionIfNoneSet(new DirectoryException(
                      Integer.toString(ieCtx.importSource)));
            ieCtx.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, errMsg));
            return null;
          }
@@ -1896,7 +1932,7 @@
      }
      catch(Exception e)
      {
        ieContext.setExceptionIfNoneSet(new DirectoryException(
        ieCtx.setExceptionIfNoneSet(new DirectoryException(
            ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
      }
@@ -1957,9 +1993,10 @@
          Arrays.toString(lDIFEntry));
    // build the message
    IEContext ieCtx = ieContext;
    EntryMsg entryMessage = new EntryMsg(
        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
        ++ieCtx.msgCnt);
    // Waiting the slowest loop
    while (!broker.shuttingDown())
@@ -1969,30 +2006,30 @@
      server that have been stored by the listener thread in the ieContext,
      we just abandon the export by throwing an exception.
      */
      if (ieContext.getException() != null)
      if (ieCtx.getException() != null)
      {
        throw new IOException(ieContext.getException().getMessage());
        throw new IOException(ieCtx.getException().getMessage());
      }
      int slowestServerId = ieContext.getSlowestServer();
      int slowestServerId = ieCtx.getSlowestServer();
      if (isRemoteDSConnected(slowestServerId)==null)
      {
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
                Integer.toString(ieContext.getSlowestServer()))));
                Integer.toString(ieCtx.getSlowestServer()))));
        throw new IOException("IOException with nested DirectoryException",
            ieContext.getException());
            ieCtx.getException());
      }
      int ourLastExportedCnt = ieContext.msgCnt;
      int slowestCnt = ieContext.ackVals.get(slowestServerId);
      int ourLastExportedCnt = ieCtx.msgCnt;
      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
      if (debugEnabled())
        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
      if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
      if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow)
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
@@ -2003,13 +2040,13 @@
        // process any connection error
        if (broker.hasConnectionError()
          || broker.getNumLostConnections() != ieContext.initNumLostConnections)
          || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
        {
          // publish failed - store the error in the ieContext ...
          DirectoryException de = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
                  Integer.toString(broker.getRsServerId())));
          ieContext.setExceptionIfNoneSet(de);
          ieCtx.setExceptionIfNoneSet(de);
          // .. and abandon the export by throwing an exception.
          throw new IOException(de.getMessage());
        }
@@ -2031,13 +2068,13 @@
    // process any publish error
    if (!sent
        || broker.hasConnectionError()
        || broker.getNumLostConnections() != ieContext.initNumLostConnections)
        || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
              Integer.toString(broker.getRsServerId())));
      ieContext.setExceptionIfNoneSet(de);
      ieCtx.setExceptionIfNoneSet(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
@@ -2045,11 +2082,11 @@
    // publish succeeded
    try
    {
      ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
      ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
    }
    catch (DirectoryException de)
    {
      ieContext.setExceptionIfNoneSet(de);
      ieCtx.setExceptionIfNoneSet(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
@@ -2157,13 +2194,12 @@
      */
      acquireIEContext(true);  //test and set if no import already in progress
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
      IEContext ieCtx = ieContext;
      ieCtx.initializeTask = initTask;
      ieCtx.attemptCnt = 0;
      ieCtx.initReqMsgSent = new InitializeRequestMsg(
          getBaseDN(), getServerId(), source, getInitWindow());
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
      broker.publish(ieCtx.initReqMsgSent);
      /*
      The normal success processing is now to receive InitTargetMsg then
@@ -2219,6 +2255,7 @@
    int source = initTargetMsgReceived.getSenderID();
    IEContext ieCtx = ieContext;
    try
    {
      // Log starting
@@ -2240,12 +2277,12 @@
      }
      // Initialize stuff
      ieContext.importSource = source;
      ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
      ieCtx.importSource = source;
      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieContext.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask)ieContext.initializeTask;
      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask) ieCtx.initializeTask;
      // Launch the import
      importBackend(new ReplInputStream(this));
@@ -2257,14 +2294,14 @@
      Store the exception raised. It will be considered if no other exception
      has been previously stored in  the context
      */
      ieContext.setExceptionIfNoneSet(e);
      ieCtx.setExceptionIfNoneSet(e);
    }
    finally
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends import with exception=" + ieContext.getException()
          + " ends import with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected());
      }
@@ -2278,10 +2315,10 @@
      */
      broker.reStart(false);
      if (ieContext.getException() != null
      if (ieCtx.getException() != null
          && broker.isConnected()
          && initFromTask != null
          && ++ieContext.attemptCnt < 2)
          && ++ieCtx.attemptCnt < 2)
      {
          /*
          Worth a new attempt
@@ -2300,13 +2337,13 @@
            the request
            */
            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                ieContext.getException().getLocalizedMessage()));
                ieCtx.getException().getLocalizedMessage()));
            broker.publish(ieContext.initReqMsgSent);
            broker.publish(ieCtx.initReqMsgSent);
            ieContext.initializeCounters(0);
            ieContext.exception = null;
            ieContext.msgCnt = 0;
            ieCtx.initializeCounters(0);
            ieCtx.exception = null;
            ieCtx.msgCnt = 0;
            // Processing of the received initTargetMsgReceived is done
            // let's wait for the next one
@@ -2320,7 +2357,7 @@
            */
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
              ieCtx.getException().getLocalizedMessage()));
          }
      }
@@ -2330,19 +2367,19 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends initialization with exception=" + ieContext.getException()
          + " ends initialization with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromTask
          + " attempt=" + ieContext.attemptCnt);
          + " attempt=" + ieCtx.attemptCnt);
      }
      try
      {
        if (broker.isConnected() && ieContext.getException() != null)
        if (broker.isConnected() && ieCtx.getException() != null)
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
              ieContext.getException().getMessageObject());
              ieCtx.getException().getMessageObject());
          broker.publish(errorMsg);
        }
        /*
@@ -2353,7 +2390,7 @@
        */
        if (initFromTask != null)
        {
          initFromTask.updateTaskCompletionState(ieContext.getException());
          initFromTask.updateTaskCompletionState(ieCtx.getException());
        }
      }
      finally
@@ -2361,8 +2398,8 @@
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            getBaseDNString(), initTargetMsgReceived.getSenderID(),
            getServerId(),
            (ieContext.getException() == null ? ""
                : ieContext.getException().getLocalizedMessage()));
            (ieCtx.getException() == null ? ""
                : ieCtx.getException().getLocalizedMessage()));
        logError(msg);
        releaseIEContext();
      } // finally
@@ -3449,43 +3486,13 @@
  }
  /**
   * Returns a boolean indicating if a total update import is currently
   * in Progress.
   * Returns the Import/Export context associated to this ReplicationDomain.
   *
   * @return A boolean indicating if a total update import is currently
   *         in Progress.
   * @return the Import/Export context associated to this ReplicationDomain
   */
  public boolean importInProgress()
  protected IEContext getImportExportContext()
  {
    return ieContext != null && ieContext.importInProgress;
  }
  /**
   * Returns a boolean indicating if a total update export is currently
   * in Progress.
   *
   * @return A boolean indicating if a total update export is currently
   *         in Progress.
   */
  public boolean exportInProgress()
  {
    return ieContext != null && !ieContext.importInProgress;
  }
  /**
   * Returns the number of entries still to be processed when a total update
   * is in progress.
   *
   * @return The number of entries still to be processed when a total update
   *         is in progress.
   */
  long getLeftEntryCount()
  {
    if (ieContext != null)
    {
      return ieContext.entryLeftCount;
    }
    return 0;
    return ieContext;
  }
  /**
@@ -3501,24 +3508,6 @@
  }
  /**
   * Returns the total number of entries to be processed when a total update
   * is in progress.
   *
   * @return The total number of entries to be processed when a total update
   *         is in progress.
   */
  long getTotalEntryCount()
  {
    if (ieContext != null)
    {
      return ieContext.entryCount;
    }
    return 0;
  }
  /**
   * Set the attributes configured on a server to be included in the ECL.
   *
   * @param serverId
@@ -3617,7 +3606,7 @@
   *          The serverId for which we want the include attributes.
   * @return The attributes.
   */
  public Set<String> getEclIncludes(int serverId)
  Set<String> getEclIncludes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
@@ -3635,7 +3624,7 @@
   *          The serverId for which we want the include attributes.
   * @return The attributes.
   */
  public Set<String> getEclIncludesForDeletes(int serverId)
  Set<String> getEclIncludesForDeletes(int serverId)
  {
    synchronized (eclIncludesLock)
    {