mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
09.45.2014 473e06f2b7f0c9c57ce90c4ef5b8347bb0c13adf
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -146,14 +147,15 @@
   * The context related to an import or export being processed
   * Null when none is being processed.
   */
  protected IEContext ieContext = null;
  private final AtomicReference<IEContext> ieContext =
      new AtomicReference<IEContext>();
  /**
   * The Thread waiting for incoming update messages for this domain and pushing
   * them to the global incoming update message queue for later processing by
   * replay threads.
   */
  private volatile DirectoryThread listenerThread = null;
  private volatile DirectoryThread listenerThread;
  /**
   * A set of counters used for Monitoring.
@@ -732,7 +734,8 @@
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg)msg;
          if (ieContext != null)
          IEContext ieCtx = ieContext.get();
          if (ieCtx != null)
          {
            /*
            This is an error termination for the 2 following cases :
@@ -750,10 +753,10 @@
                  " baseDN: " + getBaseDN() +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
            if (errorMsg.getCreationTime() > ieCtx.startTime)
            {
              // consider only ErrorMsg that relate to the current import/export
              processErrorMsg(errorMsg);
              processErrorMsg(errorMsg, ieCtx);
            }
            else
            {
@@ -784,10 +787,11 @@
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
          if (ieContext != null)
          IEContext ieCtx = ieContext.get();
          if (ieCtx != null)
          {
            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
            ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
            ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
          }
          // Trash this msg When no input/export is running/should never happen
        }
@@ -1044,7 +1048,7 @@
    private long entryLeftCount = 0;
    /** Exception raised during the initialization. */
    private DirectoryException exception = null;
    private DirectoryException exception;
    /** Whether the context is related to an import or an export. */
    private boolean importInProgress;
@@ -1061,7 +1065,7 @@
    /**
     * Request message sent when this server has the initializeFromRemote task.
     */
    private InitializeRequestMsg initReqMsgSent = null;
    private InitializeRequestMsg initReqMsgSent;
    /**
     * Start time of the initialization process. ErrorMsg timestamped before
@@ -1116,12 +1120,47 @@
    }
    /**
     * Returns a boolean indicating if a total update import is currently in
     * Progress.
     *
     * @return A boolean indicating if a total update import is currently in
     *         Progress.
     */
    public boolean importInProgress()
    {
      return importInProgress;
    }
    /**
     * Returns the total number of entries to be processed when a total update
     * is in progress.
     *
     * @return The total number of entries to be processed when a total update
     *         is in progress.
     */
    long getTotalEntryCount()
    {
      return entryCount;
    }
    /**
     * Returns the number of entries still to be processed when a total update
     * is in progress.
     *
     * @return The number of entries still to be processed when a total update
     *         is in progress.
     */
    long getLeftEntryCount()
    {
      return entryLeftCount;
    }
    /**
     * Initializes the import/export counters with the provider value.
     * @param total Total number of entries to be processed.
     * @throws DirectoryException if an error occurred.
     */
    private void initializeCounters(long total)
      throws DirectoryException
    private void initializeCounters(long total) throws DirectoryException
    {
      entryCount = total;
      entryLeftCount = total;
@@ -1150,8 +1189,7 @@
     *
     * @throws DirectoryException if an error occurred.
     */
    public void updateCounters(int entriesDone)
      throws DirectoryException
    public void updateCounters(int entriesDone) throws DirectoryException
    {
      entryLeftCount -= entriesDone;
@@ -1241,7 +1279,7 @@
      // Recompute the server with the minAck returned,means the slowest server.
      slowestServerId = serverId;
      for (Integer sid : ieContext.ackVals.keySet())
      for (Integer sid : ieContext.get().ackVals.keySet())
      {
        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
        {
@@ -1346,10 +1384,7 @@
      int serverRunningTheTask, Task initTask, int initWindow)
  throws DirectoryException
  {
    DirectoryException exportRootException = null;
    // Acquire and initialize the export context
    acquireIEContext(false);
    final IEContext ieCtx = acquireIEContext(false);
    /*
    We manage the list of servers to initialize in order :
@@ -1365,7 +1400,7 @@
      for (DSInfo dsi : getReplicasList())
      {
        ieContext.startList.add(dsi.getDsId());
        ieCtx.startList.add(dsi.getDsId());
      }
      // We manage the list of servers with which a flow control can be enabled
@@ -1373,7 +1408,7 @@
      {
        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieContext.setAckVal(dsi.getDsId(), 0);
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
@@ -1382,7 +1417,7 @@
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
          getBaseDNString(), getServerId(), serverToInitialize));
      ieContext.startList.add(serverToInitialize);
      ieCtx.startList.add(serverToInitialize);
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
@@ -1390,11 +1425,13 @@
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieContext.setAckVal(dsi.getDsId(), 0);
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
    DirectoryException exportRootException = null;
    // loop for the case where the exporter is the initiator
    int attempt = 0;
    boolean done = false;
@@ -1402,34 +1439,34 @@
    {
      try
      {
        ieContext.exportTarget = serverToInitialize;
        ieCtx.exportTarget = serverToInitialize;
        if (initTask != null)
        {
          ieContext.initializeTask = initTask;
          ieCtx.initializeTask = initTask;
        }
        ieContext.initializeCounters(this.countEntries());
        ieContext.msgCnt = 0;
        ieContext.initNumLostConnections = broker.getNumLostConnections();
        ieContext.initWindow = initWindow;
        ieCtx.initializeCounters(this.countEntries());
        ieCtx.msgCnt = 0;
        ieCtx.initNumLostConnections = broker.getNumLostConnections();
        ieCtx.initWindow = initWindow;
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            getBaseDN(), getServerId(), serverToInitialize,
            serverRunningTheTask, ieContext.entryCount, initWindow);
            serverRunningTheTask, ieCtx.entryCount, initWindow);
        broker.publish(initTargetMsg);
        // Wait for all servers to be ok
        waitForRemoteStartOfInit();
        waitForRemoteStartOfInit(ieCtx);
        // Servers that left in the list are those for which we could not test
        // that they have been successfully initialized.
        if (!ieContext.failureList.isEmpty())
        if (!ieCtx.failureList.isEmpty())
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
                  ieContext.failureList.toString()));
                  ieCtx.failureList.toString()));
        }
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
@@ -1441,7 +1478,7 @@
      catch(DirectoryException exportException)
      {
        // Give priority to the first exception raised - stored in the context
        final DirectoryException ieEx = ieContext.exception;
        final DirectoryException ieEx = ieCtx.exception;
        exportRootException = ieEx != null ? ieEx : exportException;
      }
@@ -1500,10 +1537,8 @@
            continue;
          }
          ErrorMsg errorMsg =
              new ErrorMsg(serverToInitialize,
                  exportRootException.getMessageObject());
          broker.publish(errorMsg);
          broker.publish(new ErrorMsg(
              serverToInitialize, exportRootException.getMessageObject()));
        }
        catch(Exception e)
        {
@@ -1518,20 +1553,20 @@
    } // attempt loop
    // Wait for all servers to be ok, and build the failure list
    waitForRemoteEndOfInit();
    waitForRemoteEndOfInit(ieCtx);
    // Servers that left in the list are those for which we could not test
    // that they have been successfully initialized.
    if (!ieContext.failureList.isEmpty() && exportRootException == null)
    if (!ieCtx.failureList.isEmpty() && exportRootException == null)
    {
      exportRootException = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
                  Long.toString(getGenerationID()),
                  ieContext.failureList.toString()));
                  ieCtx.failureList.toString()));
    }
    // Don't forget to release IEcontext acquired at beginning.
    releaseIEContext();
    releaseIEContext(); // FIXME should not this be in a finally?
    final String cause = exportRootException == null ? ""
        : exportRootException.getLocalizedMessage();
@@ -1558,10 +1593,10 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteStartOfInit()
  private void waitForRemoteStartOfInit(IEContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieContext.startList);
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1580,7 +1615,7 @@
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + getGenerationID());
        if (ieContext.startList.contains(dsi.getDsId()))
        if (ieCtx.startList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
          {
@@ -1604,11 +1639,11 @@
    }
    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for start ends with " + ieContext.failureList);
        "[IE] wait for start ends with " + ieCtx.failureList);
  }
  /**
@@ -1616,10 +1651,10 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteEndOfInit()
  private void waitForRemoteEndOfInit(IEContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieContext.startList);
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1645,7 +1680,7 @@
      while (it.hasNext())
      {
        int serverId = it.next();
        if (ieContext.failureList.contains(serverId))
        if (ieCtx.failureList.contains(serverId))
        {
          /*
          this server has already been in error during initialization
@@ -1701,11 +1736,11 @@
    }
    while (!done && !broker.shuttingDown()); // infinite wait
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end ends with " + ieContext.failureList);
        "[IE] wait for end ends with " + ieCtx.failureList);
  }
  /**
@@ -1718,23 +1753,26 @@
    return state;
  }
  private synchronized void acquireIEContext(boolean importInProgress)
  throws DirectoryException
  /**
   * Acquire and initialize the import/export context, verifying no other
   * import/export is in progress.
   */
  private IEContext acquireIEContext(boolean importInProgress)
      throws DirectoryException
  {
    if (ieContext != null)
    final IEContext ieCtx = new IEContext(importInProgress);
    if (!ieContext.compareAndSet(null, ieCtx))
    {
      // Rejects 2 simultaneous exports
      Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    ieContext = new IEContext(importInProgress);
    return ieCtx;
  }
  private synchronized void releaseIEContext()
  private void releaseIEContext()
  {
    ieContext = null;
    ieContext.set(null);
  }
  /**
@@ -1743,13 +1781,13 @@
   *
   * @param errorMsg The error message received.
   */
  private void processErrorMsg(ErrorMsg errorMsg)
  private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
  {
    //Exporting must not be stopped on the first error, if we run initialize-all
    if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
    {
      // The ErrorMsg is received while we have started an initialization
      ieContext.setExceptionIfNoneSet(new DirectoryException(
      ieCtx.setExceptionIfNoneSet(new DirectoryException(
          ResultCode.OTHER, errorMsg.getDetails()));
      /*
@@ -1759,11 +1797,11 @@
       *   even after the nextInitAttemptDelay
       * During the import, the ErrorMsg will be received by receiveEntryBytes
       */
      if (ieContext.initializeTask instanceof InitializeTask)
      if (ieCtx.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask) ieContext.initializeTask)
            .updateTaskCompletionState(ieContext.getException());
        ((InitializeTask) ieCtx.initializeTask)
            .updateTaskCompletionState(ieCtx.getException());
        releaseIEContext();
      }
@@ -1781,6 +1819,7 @@
    ReplicationMsg msg;
    while (true)
    {
      IEContext ieCtx = ieContext.get();
      try
      {
        // In the context of the total update, we don't want any automatic
@@ -1807,7 +1846,7 @@
          else
          {
            // Handle connection issues
            ieContext.setExceptionIfNoneSet(new DirectoryException(
            ieCtx.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
                    .get(broker.getReplicationServer())));
            return null;
@@ -1819,26 +1858,26 @@
        {
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters(countEntryLimits(entryBytes));
          ieCtx.updateCounters(countEntryLimits(entryBytes));
          if (ieContext.exporterProtocolVersion >=
          if (ieCtx.exporterProtocolVersion >=
            ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // check the msgCnt of the msg received to check ordering
            if (++ieContext.msgCnt != entryMsg.getMsgId())
            if (++ieCtx.msgCnt != entryMsg.getMsgId())
            {
              ieContext.setExceptionIfNoneSet(new DirectoryException(
              ieCtx.setExceptionIfNoneSet(new DirectoryException(
                  ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
                      String.valueOf(ieContext.msgCnt),
                      String.valueOf(ieCtx.msgCnt),
                      String.valueOf(entryMsg.getMsgId()))));
              return null;
            }
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
            {
              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
              {
@@ -1864,12 +1903,12 @@
          This is an error termination during the import
          The error is stored and the import is ended by returning null
          */
          if (ieContext.getException() == null)
          if (ieCtx.getException() == null)
          {
            ErrorMsg errMsg = (ErrorMsg)msg;
            if (errMsg.getCreationTime() > ieContext.startTime)
            if (errMsg.getCreationTime() > ieCtx.startTime)
            {
              ieContext.setException(
              ieCtx.setException(
                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
              return null;
            }
@@ -1880,15 +1919,15 @@
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if (msg instanceof TopologyMsg
              && isRemoteDSConnected(ieContext.importSource) == null)
              && isRemoteDSConnected(ieCtx.importSource) == null)
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      getBaseDNString(),
                      Integer.toString(getServerId()),
                      Integer.toString(ieContext.importSource)));
            ieContext.setExceptionIfNoneSet(new DirectoryException(
                      Integer.toString(ieCtx.importSource)));
            ieCtx.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, errMsg));
            return null;
          }
@@ -1896,7 +1935,7 @@
      }
      catch(Exception e)
      {
        ieContext.setExceptionIfNoneSet(new DirectoryException(
        ieCtx.setExceptionIfNoneSet(new DirectoryException(
            ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
      }
@@ -1957,9 +1996,10 @@
          Arrays.toString(lDIFEntry));
    // build the message
    IEContext ieCtx = ieContext.get();
    EntryMsg entryMessage = new EntryMsg(
        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
        ++ieCtx.msgCnt);
    // Waiting the slowest loop
    while (!broker.shuttingDown())
@@ -1969,30 +2009,30 @@
      server that have been stored by the listener thread in the ieContext,
      we just abandon the export by throwing an exception.
      */
      if (ieContext.getException() != null)
      if (ieCtx.getException() != null)
      {
        throw new IOException(ieContext.getException().getMessage());
        throw new IOException(ieCtx.getException().getMessage());
      }
      int slowestServerId = ieContext.getSlowestServer();
      int slowestServerId = ieCtx.getSlowestServer();
      if (isRemoteDSConnected(slowestServerId)==null)
      {
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
                Integer.toString(ieContext.getSlowestServer()))));
                Integer.toString(ieCtx.getSlowestServer()))));
        throw new IOException("IOException with nested DirectoryException",
            ieContext.getException());
            ieCtx.getException());
      }
      int ourLastExportedCnt = ieContext.msgCnt;
      int slowestCnt = ieContext.ackVals.get(slowestServerId);
      int ourLastExportedCnt = ieCtx.msgCnt;
      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
      if (debugEnabled())
        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
      if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
      if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow)
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
@@ -2003,13 +2043,13 @@
        // process any connection error
        if (broker.hasConnectionError()
          || broker.getNumLostConnections() != ieContext.initNumLostConnections)
          || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
        {
          // publish failed - store the error in the ieContext ...
          DirectoryException de = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
                  Integer.toString(broker.getRsServerId())));
          ieContext.setExceptionIfNoneSet(de);
          ieCtx.setExceptionIfNoneSet(de);
          // .. and abandon the export by throwing an exception.
          throw new IOException(de.getMessage());
        }
@@ -2031,13 +2071,13 @@
    // process any publish error
    if (!sent
        || broker.hasConnectionError()
        || broker.getNumLostConnections() != ieContext.initNumLostConnections)
        || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
              Integer.toString(broker.getRsServerId())));
      ieContext.setExceptionIfNoneSet(de);
      ieCtx.setExceptionIfNoneSet(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
@@ -2045,11 +2085,11 @@
    // publish succeeded
    try
    {
      ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
      ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
    }
    catch (DirectoryException de)
    {
      ieContext.setExceptionIfNoneSet(de);
      ieCtx.setExceptionIfNoneSet(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
@@ -2127,17 +2167,14 @@
  public void initializeFromRemote(int source, Task initTask)
  throws DirectoryException
  {
    Message errMsg = null;
    if (debugEnabled())
    {
      TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
    }
    if (!broker.isConnected())
    {
      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString());
    }
    Message errMsg = !broker.isConnected()
        ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString())
        : null;
    /*
    We must not test here whether the remote source is connected to
@@ -2156,14 +2193,12 @@
      update the task.
      */
      acquireIEContext(true);  //test and set if no import already in progress
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
      final IEContext ieCtx = acquireIEContext(true);
      ieCtx.initializeTask = initTask;
      ieCtx.attemptCnt = 0;
      ieCtx.initReqMsgSent = new InitializeRequestMsg(
          getBaseDN(), getServerId(), source, getInitWindow());
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
      broker.publish(ieCtx.initReqMsgSent);
      /*
      The normal success processing is now to receive InitTargetMsg then
@@ -2219,6 +2254,7 @@
    int source = initTargetMsgReceived.getSenderID();
    IEContext ieCtx = ieContext.get();
    try
    {
      // Log starting
@@ -2236,16 +2272,16 @@
        server.
        Test and set if no import already in progress
        */
        acquireIEContext(true);
        ieCtx = acquireIEContext(true);
      }
      // Initialize stuff
      ieContext.importSource = source;
      ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
      ieCtx.importSource = source;
      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieContext.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask)ieContext.initializeTask;
      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask) ieCtx.initializeTask;
      // Launch the import
      importBackend(new ReplInputStream(this));
@@ -2257,14 +2293,14 @@
      Store the exception raised. It will be considered if no other exception
      has been previously stored in  the context
      */
      ieContext.setExceptionIfNoneSet(e);
      ieCtx.setExceptionIfNoneSet(e);
    }
    finally
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends import with exception=" + ieContext.getException()
          + " ends import with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected());
      }
@@ -2278,10 +2314,10 @@
      */
      broker.reStart(false);
      if (ieContext.getException() != null
      if (ieCtx.getException() != null
          && broker.isConnected()
          && initFromTask != null
          && ++ieContext.attemptCnt < 2)
          && ++ieCtx.attemptCnt < 2)
      {
          /*
          Worth a new attempt
@@ -2300,13 +2336,13 @@
            the request
            */
            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                ieContext.getException().getLocalizedMessage()));
                ieCtx.getException().getLocalizedMessage()));
            broker.publish(ieContext.initReqMsgSent);
            broker.publish(ieCtx.initReqMsgSent);
            ieContext.initializeCounters(0);
            ieContext.exception = null;
            ieContext.msgCnt = 0;
            ieCtx.initializeCounters(0);
            ieCtx.exception = null;
            ieCtx.msgCnt = 0;
            // Processing of the received initTargetMsgReceived is done
            // let's wait for the next one
@@ -2320,7 +2356,7 @@
            */
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
              ieCtx.getException().getLocalizedMessage()));
          }
      }
@@ -2330,19 +2366,19 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends initialization with exception=" + ieContext.getException()
          + " ends initialization with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromTask
          + " attempt=" + ieContext.attemptCnt);
          + " attempt=" + ieCtx.attemptCnt);
      }
      try
      {
        if (broker.isConnected() && ieContext.getException() != null)
        if (broker.isConnected() && ieCtx.getException() != null)
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
              ieContext.getException().getMessageObject());
              ieCtx.getException().getMessageObject());
          broker.publish(errorMsg);
        }
        /*
@@ -2353,7 +2389,7 @@
        */
        if (initFromTask != null)
        {
          initFromTask.updateTaskCompletionState(ieContext.getException());
          initFromTask.updateTaskCompletionState(ieCtx.getException());
        }
      }
      finally
@@ -2361,8 +2397,8 @@
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            getBaseDNString(), initTargetMsgReceived.getSenderID(),
            getServerId(),
            (ieContext.getException() == null ? ""
                : ieContext.getException().getLocalizedMessage()));
            (ieCtx.getException() == null ? ""
                : ieCtx.getException().getLocalizedMessage()));
        logError(msg);
        releaseIEContext();
      } // finally
@@ -2435,7 +2471,7 @@
   */
  public boolean ieRunning()
  {
    return ieContext != null;
    return ieContext.get() != null;
  }
  /**
@@ -3449,43 +3485,13 @@
  }
  /**
   * Returns a boolean indicating if a total update import is currently
   * in Progress.
   * Returns the Import/Export context associated to this ReplicationDomain.
   *
   * @return A boolean indicating if a total update import is currently
   *         in Progress.
   * @return the Import/Export context associated to this ReplicationDomain
   */
  public boolean importInProgress()
  protected IEContext getImportExportContext()
  {
    return ieContext != null && ieContext.importInProgress;
  }
  /**
   * Returns a boolean indicating if a total update export is currently
   * in Progress.
   *
   * @return A boolean indicating if a total update export is currently
   *         in Progress.
   */
  public boolean exportInProgress()
  {
    return ieContext != null && !ieContext.importInProgress;
  }
  /**
   * Returns the number of entries still to be processed when a total update
   * is in progress.
   *
   * @return The number of entries still to be processed when a total update
   *         is in progress.
   */
  long getLeftEntryCount()
  {
    if (ieContext != null)
    {
      return ieContext.entryLeftCount;
    }
    return 0;
    return ieContext.get();
  }
  /**
@@ -3501,24 +3507,6 @@
  }
  /**
   * Returns the total number of entries to be processed when a total update
   * is in progress.
   *
   * @return The total number of entries to be processed when a total update
   *         is in progress.
   */
  long getTotalEntryCount()
  {
    if (ieContext != null)
    {
      return ieContext.entryCount;
    }
    return 0;
  }
  /**
   * Set the attributes configured on a server to be included in the ECL.
   *
   * @param serverId
@@ -3617,7 +3605,7 @@
   *          The serverId for which we want the include attributes.
   * @return The attributes.
   */
  public Set<String> getEclIncludes(int serverId)
  Set<String> getEclIncludes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
@@ -3635,7 +3623,7 @@
   *          The serverId for which we want the include attributes.
   * @return The attributes.
   */
  public Set<String> getEclIncludesForDeletes(int serverId)
  Set<String> getEclIncludesForDeletes(int serverId)
  {
    synchronized (eclIncludesLock)
    {