mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
24.44.2013 46fd9423ab622d7f9531aa1564846ec52fe09534
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -165,8 +166,8 @@
   * to be able to correlate all the coming back acks to the original
   * operation.
   */
  private final SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMsg>();
  private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs =
    new ConcurrentHashMap<ChangeNumber, UpdateMsg>();
  /**
@@ -243,7 +244,7 @@
  // that have not been successfully acknowledged (either because of timeout,
  // wrong status or error at replay) for a particular server (DS or RS). String
  // format: <server id>:<number of failed updates>
  private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Integer,Integer>();
  // Number of updates received in Assured Mode, Safe Read request
  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
@@ -264,7 +265,7 @@
  // Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
  // that have not been successfully acknowledged because of timeout for a
  // particular RS. String format: <server id>:<number of failed updates>
  private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  /**
@@ -278,10 +279,12 @@
  /* Status related monitoring fields */
  // Indicates the date when the status changed. This may be used to indicate
  // the date the session with the current replication server started (when
  // status is NORMAL for instance). All the above assured monitoring fields
  // are also reset each time the status is changed
  /**
   * Indicates the date when the status changed. This may be used to indicate
   * the date the session with the current replication server started (when
   * status is NORMAL for instance). All the above assured monitoring fields
   * are also reset each time the status is changed
   */
  private Date lastStatusChangeDate = new Date();
  /**
@@ -583,7 +586,7 @@
  }
  /**
   * Returns informations about the DS server related to the provided serverId.
   * Returns information about the DS server related to the provided serverId.
   * based on the TopologyMsg we received when the remote replica connected or
   * disconnected. Return null when no server with the provided serverId is
   * connected.
@@ -696,8 +699,7 @@
   */
  public void setURLs(Set<String> referralsUrl)
  {
    for (String url : referralsUrl)
      this.refUrls.add(url);
      this.refUrls.addAll(referralsUrl);
  }
  /**
@@ -793,11 +795,12 @@
          // Another server is exporting its entries to us
          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
          // This must be done while we are still holding the
          // broker lock because we are now going to receive a
          // bunch of entries from the remote server and we
          // want the import thread to catch them and
          // not the ListenerThread.
          /*
          This must be done while we are still holding the broker lock
          because we are now going to receive a bunch of entries from the
          remote server and we want the import thread to catch them and
          not the ListenerThread.
          */
          initialize(initTargetMsg, initTargetMsg.getSenderID());
        }
        else if (msg instanceof ErrorMsg)
@@ -805,15 +808,16 @@
          ErrorMsg errorMsg = (ErrorMsg)msg;
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //    For example, when we publish a request and the
            //    replicationServer did not find the import source.
            //
            // A remote error during the import will be received in the
            // receiveEntryBytes() method.
            //
            /*
            This is an error termination for the 2 following cases :
            - either during an export
            - or before an import really started
            For example, when we publish a request and the
            replicationServer did not find the import source.
            A remote error during the import will be received in the
            receiveEntryBytes() method.
            */
            if (debugEnabled())
              TRACER.debugInfo(
                  "[IE] processErrorMsg:" + this.serverID +
@@ -827,9 +831,11 @@
            }
            else
            {
              // Simply log - happen when the ErrorMsg relates to a previous
              // attempt of initialization while we have started a new one
              // on this side.
              /*
              Simply log - happen when the ErrorMsg relates to a previous
              attempt of initialization while we have started a new one
              on this side.
              */
              logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
            }
          }
@@ -864,15 +870,17 @@
      {
        // just retry
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
      // This must be done outside of the portion of code protected
      // by the broker lock so that we keep receiveing update
      // when we are doing and export and so that a possible
      // closure of the socket happening when we are publishing the
      // entries to the remote can be handled by the other
      // replay thread when they call this method and therefore the
      // broker.receive() method.
      /*
      Test if we have received and export request message and
      if that's the case handle it now.
      This must be done outside of the portion of code protected
      by the broker lock so that we keep receiving update
      when we are doing and export and so that a possible
      closure of the socket happening when we are publishing the
      entries to the remote can be handled by the other
      replay thread when they call this method and therefore the
      broker.receive() method.
      */
      if (initReqMsg != null)
      {
        // Do this work in a thread to allow replay thread continue working
@@ -898,8 +906,8 @@
   * particular server in the list. This increments the counter of error for the
   * passed server, or creates an initial value of 1 error for it if the server
   * is not yet present in the map.
   * @param errorList
   * @param sid
   * @param errorsByServer map of number of errors per serverID
   * @param sid the ID of the server which produced an error
   */
  private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
    Integer sid)
@@ -916,7 +924,7 @@
      {
        // Server already present in list, just increment number of
        // errors for the server
        int val = serverErrCount.intValue();
        int val = serverErrCount;
        val++;
        errorsByServer.put(sid, val);
      }
@@ -935,10 +943,7 @@
    // Remove the message for pending ack list (this may already make the thread
    // that is waiting for the ack be aware of its reception)
    synchronized (waitingAckMsgs)
    {
      update = waitingAckMsgs.remove(changeNumber);
    }
    update = waitingAckMsgs.remove(changeNumber);
    // Signal waiting thread ack has been received
    if (update != null)
@@ -957,8 +962,10 @@
      if ( hasTimeout || hasReplayErrors || hasWrongStatus)
      {
        // Some problems detected: message not correclty reached every requested
        // servers. Log problem
        /*
        Some problems detected: message did not correctly reach every
        requested servers. Log problem
        */
        Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
            serviceID, Integer.toString(serverID),
            update.toString(), ack.errorsToString());
@@ -1021,27 +1028,6 @@
    }
  }
  /**
   * Retrieves a replication domain based on the baseDn.
   *
   * @param serviceID           The identifier of the domain to retrieve.
   *
   * @return                    The domain retrieved.
   *
   * @throws DirectoryException When an error occurred or no domain
   *                            match the provided baseDn.
   */
  static ReplicationDomain retrievesReplicationDomain(String serviceID)
  throws DirectoryException
  {
    ReplicationDomain replicationDomain = domains.get(serviceID);
    if (replicationDomain == null)
    {
      throw new DirectoryException(ResultCode.OTHER,
          ERR_NO_MATCHING_DOMAIN.get(serviceID));
    }
    return replicationDomain;
  }
  /*
   * After this point the code is related to the Total Update.
@@ -1051,7 +1037,7 @@
   * This thread is launched when we want to export data to another server.
   *
   * When a task is created locally (so this local server is the initiator)
   * of the export (Exemple: dsreplication initialize-all),
   * of the export (Example: dsreplication initialize-all),
   * this thread is NOT used but the task thread is running the export instead).
   */
  private class ExportThread extends DirectoryThread
@@ -1095,9 +1081,11 @@
            initWindow);
      } catch (DirectoryException de)
      {
        // An error message has been sent to the peer
        // This server is not the initiator of the export so there is
        // nothing more to do locally.
        /*
        An error message has been sent to the peer
        This server is not the initiator of the export so there is
        nothing more to do locally.
        */
      }
      if (debugEnabled())
@@ -1211,29 +1199,6 @@
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     * @throws DirectoryException if an error occurred.
     */
    public void updateCounters()
      throws DirectoryException
    {
      entryLeftCount--;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
        }
      }
    }
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     *
     * @param  entriesDone The number of entries that were processed
     *                     since the last time this method was called.
@@ -1379,7 +1344,7 @@
   * on this server, and the {@code importBackend(InputStream)}
   * will be called on the remote server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * The InputStream and OutputStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param target   The server-id of the server that should be initialized.
@@ -1394,10 +1359,7 @@
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, this.serverID, initTask, this.initWindow);
  }
  /**
@@ -1426,10 +1388,12 @@
    // Acquire and initialize the export context
    acquireIEContext(false);
    // We manage the list of servers to initialize in order :
    // - to test at the end that all expected servers have reconnected
    //   after their import and with the right genId
    // - to update the task with the server(s) where this test failed
    /*
    We manage the list of servers to initialize in order :
    - to test at the end that all expected servers have reconnected
    after their import and with the right genId
    - to update the task with the server(s) where this test failed
    */
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
@@ -1526,14 +1490,15 @@
      {
        try
        {
          // Handling the errors during export
          /*
          Handling the errors during export
          // Note: we could have lost the connection and another thread
          //       the listener one) has already managed to reconnect.
          //       So we MUST rely on the test broker.isConnected()
          //       ONLY to do 'wait to be reconnected by another thread'
          //       (if not yet reconnected already).
          Note: we could have lost the connection and another thread
          the listener one) has already managed to reconnect.
          So we MUST rely on the test broker.isConnected()
          ONLY to do 'wait to be reconnected by another thread'
          (if not yet reconnected already).
          */
          if (!broker.isConnected())
          {
            // We are still disconnected, so we wait for the listener thread
@@ -1550,14 +1515,16 @@
          if ((initTask != null) && broker.isConnected() &&
              (serverToInitialize != RoutableMsg.ALL_SERVERS))
          {
            // NewAttempt case : In the case where
            // - it's not an InitializeAll
            // - AND the previous export attempt failed
            // - AND we are (now) connected
            // - and we own the task and this task is not an InitializeAll
            // Let's :
            // - sleep to let time to the other peer to reconnect if needed
            // - and launch another attempt
            /*
            NewAttempt case : In the case where
            - it's not an InitializeAll
            - AND the previous export attempt failed
            - AND we are (now) connected
            - and we own the task and this task is not an InitializeAll
            Let's :
            - sleep to let time to the other peer to reconnect if needed
            - and launch another attempt
            */
            try { Thread.sleep(1000); } catch(Exception e){}
            logError(NOTE_RESENDING_INIT_TARGET.get(
                exportRootException.getLocalizedMessage()));
@@ -1632,8 +1599,7 @@
    int waitResultAttempt = 0;
    Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
      replicasWeAreWaitingFor.add(sid);
    replicasWeAreWaitingFor.addAll(ieContext.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1657,8 +1623,11 @@
          {
            // this one is still not doing the Full Update ... retry later
            done = false;
            try
            { Thread.sleep(100); } catch (InterruptedException e) {}
            try { Thread.sleep(100);
            }
            catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
            waitResultAttempt++;
            break;
          }
@@ -1673,8 +1642,7 @@
    while ((!done) && (waitResultAttempt<1200) // 2mn
        && (!broker.shuttingDown()));
    ieContext.failureList.addAll(
        Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1697,9 +1665,11 @@
      TRACER.debugInfo(
        "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    // In case some new servers appear during the init, we want them to be
    // considered in the processing of sorting the successfully initialized
    // and the others
    /*
    In case some new servers appear during the init, we want them to be
    considered in the processing of sorting the successfully initialized
    and the others
    */
    for (DSInfo dsi : getReplicasList())
      replicasWeAreWaitingFor.add(dsi.getDsId());
@@ -1709,22 +1679,25 @@
      done = true;
      short reconnectMaxDelayInSec = 10;
      short reconnectWait = 0;
      Integer[] servers = replicasWeAreWaitingFor.toArray(new Integer[0]);
      for (int serverId : servers)
      for (int serverId : replicasWeAreWaitingFor)
      {
        if (ieContext.failureList.contains(serverId))
        {
          // this server has already been in error during initialization
          // dont't wait for it
          /*
          this server has already been in error during initialization
          don't wait for it
          */
          continue;
        }
        DSInfo dsInfo = isRemoteDSConnected(serverId);
        if (dsInfo == null)
        {
          // this server is disconnected
          // may be for a long time if it crashed or had been stopped
          // may be just the time to reconnect after import : should be short
          /*
          this server is disconnected
          may be for a long time if it crashed or had been stopped
          may be just the time to reconnect after import : should be short
          */
          if (++reconnectWait<reconnectMaxDelayInSec)
          {
            // let's still wait to give a chance to this server to reconnect
@@ -1764,8 +1737,7 @@
    }
    while ((!done) && (!broker.shuttingDown())); // infinite wait
    ieContext.failureList.addAll(
        Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1839,8 +1811,10 @@
      }
      else
      {
        // When we are the exporter in the case of initializeAll
        // exporting must not be stopped on the first error.
        /*
        When we are the exporter in the case of initializeAll
        exporting must not be stopped on the first error.
        */
      }
    }
  }
@@ -1889,7 +1863,7 @@
          }
        }
        // Check good sequentiality of msg received
        // Check good ordering of msg received
        if (msg instanceof EntryMsg)
        {
          EntryMsg entryMsg = (EntryMsg)msg;
@@ -1899,7 +1873,7 @@
          if (ieContext.exporterProtocolVersion >=
            ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // check the msgCnt of the msg received to check sequenciality
            // check the msgCnt of the msg received to check ordering
            if (++ieContext.msgCnt != entryMsg.getMsgId())
            {
              if (ieContext.getException() == null)
@@ -1928,16 +1902,20 @@
        }
        else if (msg instanceof DoneMsg)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          /*
          This is the normal termination of the import
          No error is stored and the import is ended
          by returning null
          */
          return null;
        }
        else if (msg instanceof ErrorMsg)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          /*
          This is an error termination during the import
          The error is stored and the import is ended
          by returning null
          */
          if (ieContext.getException() == null)
          {
            ErrorMsg errMsg = (ErrorMsg)msg;
@@ -1971,7 +1949,6 @@
      }
      catch(Exception e)
      {
        // TODO: i18n
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
@@ -1984,7 +1961,7 @@
   * This is based on the hypothesis that the entries are separated
   * by a "\n\n" String.
   *
   * @param   entryBytes
   * @param   entryBytes the set of bytes containing one or more entries.
   * @return  The number of entries in the provided byte[].
   */
  private int countEntryLimits(byte[] entryBytes)
@@ -1997,7 +1974,7 @@
   * This is based on the hypothesis that the entries are separated
   * by a "\n\n" String.
   *
   * @param   entryBytes
   * @param   entryBytes the set of bytes containing one or more entries.
   * @return  The number of entries in the provided byte[].
   */
  private int countEntryLimits(byte[] entryBytes, int pos, int length)
@@ -2029,7 +2006,8 @@
  throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" +
          Arrays.toString(lDIFEntry));
    // build the message
    EntryMsg entryMessage = new EntryMsg(
@@ -2039,9 +2017,11 @@
    // Waiting the slowest loop
    while (!broker.shuttingDown())
    {
      // If an error was raised - like receiving an ErrorMsg from a remote
      // server that have been stored by the listener thread in the ieContext,
      // we just abandon the export by throwing an exception.
      /*
      If an error was raised - like receiving an ErrorMsg from a remote
      server that have been stored by the listener thread in the ieContext,
      we just abandon the export by throwing an exception.
      */
      if (ieContext.getException() != null)
        throw(new IOException(ieContext.getException().getMessage()));
@@ -2094,7 +2074,8 @@
    } // Waiting the slowest loop
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
          + Arrays.toString(lDIFEntry));
    // publish the message
    boolean sent = broker.publish(entryMessage, false);
@@ -2212,18 +2193,22 @@
      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
    }
    // We must not test here whether the remote source is connected to
    // the topology by testing if it stands in the replicas list since.
    // In the case of a re-attempt of initialization, the listener thread is
    // running this method directly coming from initailize() method and did
    // not processed any topology message in between the failure and the
    // new attempt.
    /*
    We must not test here whether the remote source is connected to
    the topology by testing if it stands in the replicas list since.
    In the case of a re-attempt of initialization, the listener thread is
    running this method directly coming from initialize() method and did
    not processed any topology message in between the failure and the
    new attempt.
    */
    try
    {
      // We must immediatly acquire a context to store the task inside
      // The context will be used when we (the listener thread) will receive
      // the InitializeTargetMsg, process the import, and at the end
      // update the task.
      /*
      We must immediately acquire a context to store the task inside
      The context will be used when we (the listener thread) will receive
      the InitializeTargetMsg, process the import, and at the end
      update the task.
      */
      acquireIEContext(true);  //test and set if no import already in progress
      ieContext.initializeTask = initTask;
@@ -2234,11 +2219,13 @@
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
      // The normal success processing is now to receive InitTargetMsg then
      // entries from the remote server.
      // The error cases are :
      // - either local error immediatly caught below
      // - a remote error we will receive as an ErrorMsg
      /*
      The normal success processing is now to receive InitTargetMsg then
      entries from the remote server.
      The error cases are :
      - either local error immediately caught below
      - a remote error we will receive as an ErrorMsg
      */
    }
    catch(DirectoryException de)
    {
@@ -2272,15 +2259,15 @@
   *
   * @param initTargetMsgReceived The message received from the remote server.
   *
   * @param requestorServerId The serverId of the server that requested the
   * @param requesterServerId The serverId of the server that requested the
   *                          initialization meaning the server where the
   *                          task has initially been created (this server,
   *                          or the remote server).
   */
  void initialize(InitializeTargetMsg initTargetMsgReceived,
      int requestorServerId)
      int requesterServerId)
  {
    InitializeTask initFromtask = null;
    InitializeTask initFromTask = null;
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
@@ -2300,16 +2287,20 @@
      // Acquire an import context if no already done (and initialize).
      if (initTargetMsgReceived.getInitiatorID() == this.serverID)
      {
        // The initTargetMsgReceived received is the answer to a request that
        // we (this server) sent previously. In this case, so the IEContext
        // has been already acquired when the request was published in order
        // to store the task (to be updated with the status at the end).
        /*
        The initTargetMsgReceived received is the answer to a request that
        we (this server) sent previously. In this case, so the IEContext
        has been already acquired when the request was published in order
        to store the task (to be updated with the status at the end).
        */
      }
      else
      {
        // The initTargetMsgReceived is for an import initiated by the remote
        // server.
        // Test and set if no import already in progress
        /*
        The initTargetMsgReceived is for an import initiated by the remote
        server.
        Test and set if no import already in progress
        */
        acquireIEContext(true);
      }
@@ -2319,16 +2310,18 @@
      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieContext.exporterProtocolVersion = getProtocolVersion(source);
      initFromtask = (InitializeTask)ieContext.initializeTask;
      initFromTask = (InitializeTask)ieContext.initializeTask;
      // Lauch the import
      // Launch the import
      importBackend(new ReplInputStream(this));
    }
    catch (DirectoryException e)
    {
      // Store the exception raised. It will be considered if no other exception
      // has been previously stored in  the context
      /*
      Store the exception raised. It will be considered if no other exception
      has been previously stored in  the context
      */
      if (ieContext.getException() == null)
        ieContext.setException(e);
    }
@@ -2339,30 +2332,37 @@
          + " ends import with exception=" + ieContext.getException()
          + " connected=" + broker.isConnected());
      // It is necessary to restart (reconnect to RS) for different reasons
      //   - when everything went well, reconnect in order to exchange
      //     new state, new generation ID
      //   - when we have connection failure, reconnect to retry a new import
      //     right here, right now
      // we never want retryOnFailure if we fails reconnecting in the restart.
      /*
      It is necessary to restart (reconnect to RS) for different reasons
      - when everything went well, reconnect in order to exchange
      new state, new generation ID
      - when we have connection failure, reconnect to retry a new import
      right here, right now
      we never want retryOnFailure if we fails reconnecting in the restart.
      */
      broker.reStart(false);
      if (ieContext.getException() != null)
      {
        if (broker.isConnected() && (initFromtask != null)
        if (broker.isConnected() && (initFromTask != null)
            && (++ieContext.attemptCnt<2))
        {
          // Worth a new attempt
          // since initFromtask is in this server, connection is ok
          /*
          Worth a new attempt
          since initFromTask is in this server, connection is ok
          */
          try
          {
            // Wait for the exporter to stabilize - eventually reconnect as
            // well if it was connected to the same RS than the one we lost ...
            /*
            Wait for the exporter to stabilize - eventually reconnect as
            well if it was connected to the same RS than the one we lost ...
            */
            Thread.sleep(1000);
            // Restart the whole import protocol exchange by sending again
            // the request
            /*
            Restart the whole import protocol exchange by sending again
            the request
            */
            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                ieContext.getException().getLocalizedMessage()));
@@ -2378,8 +2378,10 @@
          }
          catch(Exception e)
          {
            // An error occurs when sending a new request for a new import.
            // This error is not stored, prefering to keep the initial one.
            /*
            An error occurs when sending a new request for a new import.
            This error is not stored, prefering to keep the initial one.
            */
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
@@ -2394,7 +2396,7 @@
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends initialization with exception=" + ieContext.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromtask
          + " task=" + initFromTask
          + " attempt=" + ieContext.attemptCnt);
      try
@@ -2402,24 +2404,28 @@
        if (broker.isConnected() && (ieContext.getException() != null))
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
              ieContext.getException().getMessageObject());
          broker.publish(errorMsg);
        }
        else // !broker.isConnected()
        {
          // Don't try to reconnect here.
          // The current running thread is the listener thread and will loop on
          // receive() that is expected to manage reconnects attempt.
          /*
          Don't try to reconnect here.
          The current running thread is the listener thread and will loop on
          receive() that is expected to manage reconnects attempt.
          */
        }
        // Update the task that initiated the import must be the last thing.
        // Particularly, broker.restart() after import success must be done
        // before some other operations/tasks to be launched,
        // like resetting the generation ID.
        if (initFromtask != null)
        /*
        Update the task that initiated the import must be the last thing.
        Particularly, broker.restart() after import success must be done
        before some other operations/tasks to be launched,
        like resetting the generation ID.
        */
        if (initFromTask != null)
        {
          initFromtask.updateTaskCompletionState(ieContext.getException());
          initFromTask.updateTaskCompletionState(ieContext.getException());
        }
      }
      finally
@@ -2435,10 +2441,10 @@
  }
  /**
   * Return the protocol version of the DS related to the provided serverid.
   * Return the protocol version of the DS related to the provided serverId.
   * Returns -1 when the protocol version is not known.
   * @param dsServerId The provided serverid.
   * @return The procotol version.
   * @param dsServerId The provided serverId.
   * @return The protocol version.
   */
  short getProtocolVersion(int dsServerId)
  {
@@ -2515,11 +2521,11 @@
  private void checkGenerationID(long generationID)
  throws DirectoryException
  {
    boolean allset = true;
    boolean allSet = true;
    for (int i = 0; i< 50; i++)
    {
      allset = true;
      allSet = true;
      for (RSInfo rsInfo : getRsList())
      {
        // the 'empty' RSes (generationId==-1) are considered as good citizens
@@ -2532,16 +2538,16 @@
          } catch (InterruptedException e)
          {
          }
          allset = false;
          allSet = false;
          break;
        }
      }
      if (allset)
      if (allSet)
      {
        break;
      }
    }
    if (!allset)
    if (!allSet)
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
@@ -2735,9 +2741,11 @@
   */
  void processUpdateDoneSynchronous(UpdateMsg msg)
  {
    // Warning: in synchronous mode, no way to tell the replay of an update went
    // wrong Just put null in processUpdateDone so that if assured replication
    // is used the ack is sent without error at replay flag.
    /*
    Warning: in synchronous mode, no way to tell the replay of an update went
    wrong Just put null in processUpdateDone so that if assured replication
    is used the ack is sent without error at replay flag.
    */
    processUpdateDone(msg, null);
    state.update(msg.getChangeNumber());
  }
@@ -2749,10 +2757,7 @@
   */
  public boolean isConnected()
  {
    if (broker != null)
      return broker.isConnected();
    else
      return false;
    return broker != null && broker.isConnected();
  }
  /**
@@ -2764,10 +2769,7 @@
   */
  public boolean hasConnectionError()
  {
    if (broker != null)
      return broker.hasConnectionError();
    else
      return true;
    return broker == null || broker.hasConnectionError();
  }
  /**
@@ -2852,24 +2854,16 @@
  /**
   * Gets the number of updates sent in assured safe read mode that have not
   * been acknowledged per server.
   * @return The number of updates sent in assured safe read mode that have not
   * been acknowledged per server.
   * @return A copy of the map that contains the number of updates sent in
   * assured safe read mode that have not been acknowledged per server.
   */
  public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
  {
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
    synchronized(assuredSrServerNotAcknowledgedUpdates)
    {
      Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
      for (Integer serverId : keySet)
      {
        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
        snapshot.put(serverId, i);
      }
      return new HashMap<Integer, Integer>(
          assuredSrServerNotAcknowledgedUpdates);
    }
    return snapshot;
  }
  /**
@@ -2937,24 +2931,16 @@
  /**
   * Gets the number of updates sent in assured safe data mode that have not
   * been acknowledged due to timeout error per server.
   * @return The number of updates sent in assured safe data mode that have not
   * been acknowledged due to timeout error per server.
   * @return A copy of the map that contains the number of updates sent in
   * assured safe data mode that have not been acknowledged due to timeout
   * error per server.
   */
  public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
  {
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
    synchronized(assuredSdServerTimeoutUpdates)
    {
      Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet();
      for (Integer serverId : keySet)
      {
        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
        snapshot.put(serverId, i);
      }
      return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates);
    }
    return snapshot;
  }
  /**
@@ -2981,14 +2967,20 @@
    assuredSrTimeoutUpdates = new AtomicInteger(0);
    assuredSrWrongStatusUpdates = new AtomicInteger(0);
    assuredSrReplayErrorUpdates = new AtomicInteger(0);
    assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>();
    synchronized (assuredSrServerNotAcknowledgedUpdates)
    {
      assuredSrServerNotAcknowledgedUpdates.clear();
    }
    assuredSrReceivedUpdates = new AtomicInteger(0);
    assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
    assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
    assuredSdSentUpdates = new AtomicInteger(0);
    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
    assuredSdTimeoutUpdates = new AtomicInteger(0);
    assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>();
    synchronized (assuredSdServerTimeoutUpdates)
    {
      assuredSdServerTimeoutUpdates.clear();
    }
  }
  /*
@@ -3080,8 +3072,10 @@
  {
    synchronized (sessionLock)
    {
      // Stop the broker first in order to prevent the listener from
      // reconnecting - see OPENDJ-457.
      /*
      Stop the broker first in order to prevent the listener from
      reconnecting - see OPENDJ-457.
      */
      if (broker != null)
      {
        broker.stop();
@@ -3263,8 +3257,10 @@
  {
    broker.updateWindowAfterReplay();
    // Send an ack if it was requested and the group id is the same of the RS
    // one. Only Safe Read mode makes sense in DS for returning an ack.
    /*
    Send an ack if it was requested and the group id is the same of the RS
    one. Only Safe Read mode makes sense in DS for returning an ack.
    */
    byte rsGroupId = broker.getRsGroupId();
    if (msg.isAssured())
    {
@@ -3282,9 +3278,9 @@
            if (replayErrorMsg != null)
            {
              // Mark the error in the ack
              //   -> replay error occured
              //   -> replay error occurred
              ackMsg.setHasReplayError(true);
              //   -> replay error occured in our server
              //   -> replay error occurred in our server
              List<Integer> idList = new ArrayList<Integer>();
              idList.add(serverID);
              ackMsg.setFailedServers(idList);
@@ -3306,8 +3302,10 @@
          logError(errorMsg);
        } else
        {
          // In safe data mode assured update that comes up to a DS requires no
          // ack from a destinator DS. Safe data mode is based on RS acks only
          /*
          In safe data mode assured update that comes up to a DS requires no
          ack from a recipient DS. Safe data mode is based on RS acks only
          */
        }
      }
    }
@@ -3343,7 +3341,7 @@
     * If assured configured, set message accordingly to request an ack in the
     * right assured mode.
     * No ack requested for a RS with a different group id. Assured
     * replication suported for the same locality, i.e: a topology working in
     * replication supported for the same locality, i.e: a topology working in
     * the same
     * geographical location). If we are connected to a RS which is not in our
     * locality, no need to ask for an ack.
@@ -3355,12 +3353,11 @@
      if (assuredMode == AssuredMode.SAFE_DATA_MODE)
        msg.setSafeDataLevel(assuredSdLevel);
      // Add the assured message to the list of update that are
      // waiting for acks
      synchronized (waitingAckMsgs)
      {
        waitingAckMsgs.put(msg.getChangeNumber(), msg);
      }
      /*
      Add the assured message to the list of update that are
      waiting for acks
      */
      waitingAckMsgs.put(msg.getChangeNumber(), msg);
    }
  }
@@ -3410,8 +3407,10 @@
      {
        try
        {
          // WARNING: this timeout may be difficult to optimize: too low, it
          // may use too much CPU, too high, it may penalize performance...
          /*
          WARNING: this timeout may be difficult to optimize: too low, it
          may use too much CPU, too high, it may penalize performance...
          */
          msg.wait(10);
        } catch (InterruptedException e)
        {
@@ -3425,14 +3424,13 @@
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        {
          // Timeout occured, be sure that ack is not being received and if so,
          // remove the update from the wait list, log the timeout error and
          // also update assured monitoring counters
          /*
          Timeout occurred, be sure that ack is not being received and if so,
          remove the update from the wait list, log the timeout error and
          also update assured monitoring counters
          */
          UpdateMsg update;
          synchronized (waitingAckMsgs)
          {
            update = waitingAckMsgs.remove(cn);
          }
          update = waitingAckMsgs.remove(cn);
          if (update != null)
          {
@@ -3490,9 +3488,9 @@
  }
  /**
   * Publish informations to the Replication Service (not assured mode).
   * Publish information to the Replication Service (not assured mode).
   *
   * @param msg  The byte array containing the informations that should
   * @param msg  The byte array containing the information that should
   *             be sent to the remote entities.
   */
  public void publish(byte[] msg)
@@ -3501,10 +3499,11 @@
    synchronized (this)
    {
      update = new UpdateMsg(generator.newChangeNumber(), msg);
      // If assured replication is configured, this will prepare blocking
      // mechanism. If assured replication is disabled, this returns
      // immediately
      /*
      If assured replication is configured, this will prepare blocking
      mechanism. If assured replication is disabled, this returns
      immediately
      */
      prepareWaitForAckIfAssuredEnabled(update);
      publish(update);
@@ -3512,16 +3511,18 @@
    try
    {
      // If assured replication is enabled, this will wait for the matching
      // ack or time out. If assured replication is disabled, this returns
      // immediately
      /*
      If assured replication is enabled, this will wait for the matching
      ack or time out. If assured replication is disabled, this returns
      immediately
      */
      waitForAckIfAssuredEnabled(update);
    } catch (TimeoutException ex)
    {
      // This exception may only be raised if assured replication is
      // enabled
      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
        assuredTimeout), msg.toString());
        assuredTimeout), update.toString());
      logError(errorMsg);
    }
  }
@@ -3557,10 +3558,7 @@
   */
  public boolean importInProgress()
  {
    if (ieContext == null)
      return false;
    else
      return ieContext.importInProgress;
    return ieContext != null && ieContext.importInProgress;
  }
  /**
@@ -3572,10 +3570,7 @@
   */
  public boolean exportInProgress()
  {
    if (ieContext == null)
      return false;
    else
      return !ieContext.importInProgress;
    return ieContext != null && !ieContext.importInProgress;
  }
  /**