mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
27.28.2010 a5c5efbf8ca56c059709953f7fedb647dadaed06
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -72,8 +72,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.loggers.debug.DebugTracer;
@@ -91,11 +92,13 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -284,6 +287,15 @@
  private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  /**
   * Window size used during initialization .. between
   * - the initializer/exporter DS that listens/waits acknowledges and that
   *   slows down data msg publishing based on the slowest server
   * - and each initialized/importer DS that publishes acknowledges each
   *   WINDOW/2 data msg received.
   */
  protected int initWindow = 100;
  /* Status related monitoring fields */
  // Indicates the date when the status changed. This may be used to indicate
@@ -328,6 +340,28 @@
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param initWindow Window used during initialization.
   */
  public ReplicationDomain(String serviceID, int serverID,int initWindow)
  {
    this.serviceID = serviceID;
    this.serverID = serverID;
    this.initWindow = initWindow;
    this.state = new ServerState();
    this.generator = new ChangeNumberGenerator(serverID, state);
    domains.put(serviceID, this);
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   *
   * @param serviceID  The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   */
  public ReplicationDomain(String serviceID, int serverID)
  {
@@ -557,6 +591,22 @@
  }
  /**
   * Check if a remote replica (DS) is connected to the topology based on
   * the TopologyMsg we received when the remote replica connected or
   * disconnected.
   *
   * @param serverId The provided serverId of the remote replica
   * @return whether the remote replica is connected or not.
   */
  public boolean isRemoteDSConnected(int serverId)
  {
    for (DSInfo remoteDS : getReplicasList())
      if (remoteDS.getDsId() == serverId)
        return true;
    return false;
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
@@ -708,7 +758,8 @@
  /**
   * Receives an update message from the replicationServer.
   * also responsible for updating the list of pending changes
   * The other types of messages are processed in an opaque way for the caller.
   * Also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
  UpdateMsg receive()
@@ -717,11 +768,11 @@
    while (update == null)
    {
      InitializeRequestMsg initMsg = null;
      InitializeRequestMsg initReqMsg = null;
      ReplicationMsg msg;
      try
      {
        msg = broker.receive(true);
        msg = broker.receive(true, true, false);
        if (msg == null)
        {
          // The server is in the shutdown process
@@ -741,54 +792,58 @@
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMsg)msg;
          initReqMsg = (InitializeRequestMsg)msg;
        }
        else if (msg instanceof InitializeTargetMsg)
        {
          // Another server is exporting its entries to us
          InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
          try
          {
            // This must be done while we are still holding the
            // broker lock because we are now going to receive a
            // bunch of entries from the remote server and we
            // want the import thread to catch them and
            // not the ListenerThread.
            initialize(importMsg);
          }
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMsg errorMsg =
              new ErrorMsg(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
            TRACER.debugInfo(Message.toString(mb.toMessage()));
            broker.publish(errorMsg);
            logError(de.getMessageObject());
          }
          // This must be done while we are still holding the
          // broker lock because we are now going to receive a
          // bunch of entries from the remote server and we
          // want the import thread to catch them and
          // not the ListenerThread.
          initialize(initTargetMsg, initTargetMsg.getSenderID());
        }
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg)msg;
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMsg)msg);
            //    For example, when we publish a request and the
            //    replicationServer did not find the import source.
            //
            // A remote error during the import will be received in the
            // receiveEntryBytes() method.
            //
            if (debugEnabled())
              TRACER.debugInfo(
                  "[IE] processErrorMsg:" + this.serverID +
                  " serviceID: " + this.serviceID +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
            {
              // consider only ErrorMsg that relate to the current import/export
              processErrorMsg(errorMsg);
            }
            else
            {
              // Simply log - happen when the ErrorMsg relates to a previous
              // attempt of initialization while we have started a new one
              // on this side.
              logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
            }
          }
          else
          {
            /*
             * Log error message
             */
            ErrorMsg errorMsg = (ErrorMsg)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
            // Simply log - happen if import/export has been terminated
            // on our side before receiving this ErrorMsg.
            logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
          }
        }
        else if (msg instanceof ChangeStatusMsg)
@@ -801,6 +856,15 @@
          update = (UpdateMsg) msg;
          generator.adjust(update.getChangeNumber());
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
          if (ieContext != null)
          {
            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
            ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
          }
          // Trash this msg When no input/export is running/should never happen
        }
      }
      catch (SocketTimeoutException e)
      {
@@ -815,10 +879,11 @@
      // entries to the remote can be handled by the other
      // replay thread when they call this method and therefore the
      // broker.receive() method.
      if (initMsg != null)
      if (initReqMsg != null)
      {
        // Do this work in a thread to allow replay thread continue working
        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
        ExportThread exportThread = new ExportThread(
            initReqMsg.getSenderID(), initReqMsg.getInitWindow());
        exportThread.start();
      }
    }
@@ -989,23 +1054,29 @@
   */
  /**
   * This thread is launched when we want to export data to another server that
   * has requested to be initialized with the data of our backend.
   * This thread is launched when we want to export data to another server.
   *
   * When a task is created locally (so this local server is the initiator)
   * of the export (Exemple: dsreplication initialize-all),
   * this thread is NOT used but the task thread is running the export instead).
   */
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will receive updates
    private int target;
    // Id of server that will be initialized
    private int serverToInitialize;
    private int initWindow;
    /**
     * Constructor for the ExportThread.
     *
     * @param i Id of server that will receive updates
     * @param serverToInitialize serverId of server that will receive entries
     */
    public ExportThread(int i)
    public ExportThread(int serverToInitialize, int initWindow)
    {
      super("Export thread " + serverID);
      this.target = i;
      super("Export thread from serverId=" + serverID
          + " to serverId=" + serverToInitialize);
      this.serverToInitialize = serverToInitialize;
      this.initWindow = initWindow;
    }
    /**
@@ -1015,22 +1086,20 @@
    public void run()
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread starting.");
      }
        TRACER.debugInfo("[IE] starting " + this.getName());
      try
      {
        initializeRemote(target, target, null);
        initializeRemote(serverToInitialize, serverToInitialize, null,
            initWindow);
      } catch (DirectoryException de)
      {
      // An error message has been sent to the peer
      // Nothing more to do locally
        // An error message has been sent to the peer
        // This server is not the initiator of the export so there is
        // nothing more to do locally.
      }
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread stopping.");
      }
        TRACER.debugInfo("[IE] ending " + this.getName());
    }
  }
@@ -1052,13 +1121,49 @@
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    // The exception raised when any
    // Exception raised during the initialization.
    DirectoryException exception = null;
    // A boolean indicating if the context is related to an
    // import or an export.
    // Whether the context is related to an import or an export.
    boolean importInProgress;
    // Current counter of messages exchanged during the initialization
    int msgCnt = 0;
    // Number of connections lost when we start the initialization.
    // Will help counting connections lost during initialization,
    int initNumLostConnections = 0;
    // Request message sent when this server has the initializeFromRemote task.
    InitializeRequestMsg initReqMsgSent = null;
    // Start time of the initialization process. ErrorMsg timestamped
    // before thi startTime will be ignored.
    long startTime;
    // List fo replicas (DS) connected to the topology when
    // initialization started.
    Set<Integer> startList = new HashSet<Integer>(0);
    // List fo replicas (DS) with a failure (disconnected from the topology)
    // since the initialization started.
    Set<Integer> failureList = new HashSet<Integer>(0);
    // Flow control during initialization
    // - for each remote server, counter of messages received
    private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
    // - serverId of the slowest server (the one with the smallest non null
    //   counter)
    private int slowestServerId = -1;
    short exporterProtocolVersion = -1;
    // Window used during this initialization
    int initWindow;
    // Number of attempt already done for this initialization
    short attemptCnt;
    /**
     * Creates a new IEContext.
     *
@@ -1069,19 +1174,21 @@
    public IEContext(boolean importInProgress)
    {
      this.importInProgress = importInProgress;
      this.startTime = System.currentTimeMillis();
      this.attemptCnt = 0;
    }
    /**
     * Initializes the import/export counters with the provider value.
     * @param total Total number of entries to be processed.
     * @param left Remaining number of entries to be processed.
     * @throws DirectoryException if an error occurred.
     */
    public void setCounters(long total, long left)
    private void initializeCounters(long total)
      throws DirectoryException
    {
      entryCount = total;
      entryLeftCount = left;
      entryLeftCount = total;
      if (initializeTask != null)
      {
@@ -1193,7 +1300,42 @@
    {
      this.exception = exception;
    }
  }
    /**
     * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
     * (updated via the listener thread)
     * @param serverId serverId of the acknowledger/receiver/importer server.
     * @param numAck   id of the message received.
     */
    public void setAckVal(int serverId, int numAck)
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck);
      this.ackVals.put(serverId, numAck);
      // Recompute the server with the minAck returned,means the slowest server.
      slowestServerId = serverId;
      for (Integer sid : ieContext.ackVals.keySet())
        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
          slowestServerId = sid;
    }
    /**
     * Returns the serverId of the server that acknowledged the smallest
     * EntryMsg id.
     * @return serverId of the server with latest acknowledge.
     *                  0 when no ack has been received yet.
     */
    public int getSlowestServer()
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId
            + " " + this.ackVals.get(slowestServerId));
      return this.slowestServerId;
    }
}
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
@@ -1260,34 +1402,10 @@
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, serverID, initTask);
    if (target == RoutableMsg.ALL_SERVERS)
    {
      // Check for the status of all remote servers to check if they
      // are all finished with the import.
      boolean done = true;
      do
      {
        done = true;
        for (DSInfo dsi : getReplicasList())
        {
          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
          {
            done = false;
            try
            {
              Thread.sleep(100);
            } catch (InterruptedException e)
            {
              // just loop again.
            }
            break;
          }
        }
      }
      while (!done);
    }
    initializeRemote(target, this.serverID, initTask, this.initWindow);
  }
  /**
@@ -1295,76 +1413,332 @@
   * specified by the target argument when this initialization specifying the
   * server that requests the initialization.
   *
   * @param target The target that should be initialized.
   * @param target2 The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   * @param serverToInitialize The target server that should be initialized.
   * @param serverRunningTheTask The server that initiated the export. It can
   * be the serverID of this server, or the serverID of a remote server.
   * @param initTask The task in this server that triggers this initialization
   * and that should be updated with its progress. Null when the export is done
   * following a request coming from a remote server (task is remote).
   * @param initWindow The value of the initialization window for flow control
   * between the importer and the exporter.
   *
   * @exception DirectoryException When an error occurs.
   * @exception DirectoryException When an error occurs. No exception raised
   * means success.
   */
  protected void initializeRemote(int target, int target2,
    Task initTask) throws DirectoryException
  protected void initializeRemote(int serverToInitialize,
      int serverRunningTheTask, Task initTask, int initWindow)
  throws DirectoryException
  {
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
        Integer.toString(serverID),
      serviceID,
      Integer.toString(target2));
    logError(msg);
    DirectoryException exportRootException = null;
    boolean contextAcquired = false;
    boolean contextAcquired=false;
    // Acquire and initialize the export context
    acquireIEContext(false);
    contextAcquired = true;
    ieContext.exportTarget = target;
    if (initTask != null)
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
        Integer.toString(serverID), Long.toString(countEntries()), serviceID,
        Integer.toString(serverToInitialize));
    logError(msg);
    // We manage the list of servers to initialize in order :
    // - to test at the end that all expected servers have reconnected
    //   after their import and with the right genId
    // - to update the task with the server(s) where this test failed
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
      for (DSInfo dsi : getReplicasList())
        ieContext.startList.add(dsi.getDsId());
    else
      ieContext.startList.add(serverToInitialize);
    // We manage the list of servers with which a flow control can be enabled
    for (DSInfo dsi : getReplicasList())
    {
      ieContext.initializeTask = initTask;
      if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        ieContext.setAckVal(dsi.getDsId(), 0);
    }
    // The number of entries to be exported is the number of entries under
    // the base DN entry and the base entry itself.
    long entryCount = this.countEntries();
    ieContext.setCounters(entryCount, entryCount);
    // Send start message to the peer
    InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
        serviceID, serverID, target, target2, entryCount);
    broker.publish(initializeMessage);
    try
    // loop for the case where the exporter is the initiator
    int attempt = 0;
    boolean done = false;
    while ((!done) && (++attempt<2)) // attempt loop
    {
      exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
      try
      {
        ieContext.exportTarget = serverToInitialize;
        if (initTask != null)
          ieContext.initializeTask = initTask;
        ieContext.initializeCounters(this.countEntries());
        ieContext.msgCnt = 0;
        ieContext.initNumLostConnections = broker.getNumLostConnections();
        ieContext.initWindow = initWindow;
      // Notify the peer of the success
      DoneMsg doneMsg = new DoneMsg(serverID,
          initializeMessage.getDestination());
      broker.publish(doneMsg);
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            serviceID, serverID, serverToInitialize, serverRunningTheTask,
            ieContext.entryCount, initWindow);
        broker.publish(initTargetMsg);
        // Wait for all servers to be ok
        waitForRemoteStartOfInit();
        // Servers that left in the list are those for which we could not test
        // that they have been successfully initialized.
        if (!ieContext.failureList.isEmpty())
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
                  ieContext.failureList.toString()));
        }
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
        // Notify the peer of the success
        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
        broker.publish(doneMsg);
      }
      catch(DirectoryException exportException)
      {
        // Give priority to the first exception raised - stored in the context
        if (ieContext.exception != null)
          exportRootException = ieContext.exception;
        else
          exportRootException = exportException;
      }
      if (debugEnabled())
        TRACER.debugInfo(
           "[IE] In " + this.monitor.getMonitorInstanceName()
           + " export ends with "
           + " connected=" + broker.isConnected()
           + " exportRootException=" + exportRootException);
      if (exportRootException != null)
      {
        try
        {
          // Handling the errors during export
          // Note: we could have lost the connection and another thread
          //       the listener one) has already managed to reconnect.
          //       So we MUST rely on the test broker.isConnected()
          //       ONLY to do 'wait to be reconnected by another thread'
          //       (if not yet reconnected already).
          if (!broker.isConnected())
          {
            // We are still disconnected, so we wait for the listener thread
            // to reconnect - wait 10s
            if (debugEnabled())
              TRACER.debugInfo(
                "[IE] Exporter wait for reconnection by the listener thread");
            int att=0;
            while ((!broker.shuttingDown()) &&
                (!broker.isConnected())&& (++att<100))
              try { Thread.sleep(100); } catch(Exception e){}
          }
          if ((initTask != null) && broker.isConnected() &&
              (serverToInitialize != RoutableMsg.ALL_SERVERS))
          {
            // NewAttempt case : In the case where
            // - it's not an InitializeAll
            // - AND the previous export attempt failed
            // - AND we are (now) connected
            // - and we own the task and this task is not an InitializeAll
            // Let's :
            // - sleep to let time to the other peer to reconnect if needed
            // - and launch another attempt
            try { Thread.sleep(1000); } catch(Exception e){}
            logError(NOTE_RESENDING_INIT_TARGET.get((exportRootException!=null?
                exportRootException.getLocalizedMessage():"")));
            continue;
          }
          ErrorMsg errorMsg =
              new ErrorMsg(serverToInitialize,
                  exportRootException.getMessageObject());
          broker.publish(errorMsg);
        }
        catch(Exception e)
        {
          // Ignore the failure raised while proceeding the root failure
        }
      }
      // We are always done for this export ...
      // ... except in the NewAttempt case (see above)
      done = true;
    } // attempt loop
    // Wait for all servers to be ok, and build the failure list
    waitForRemoteEndOfInit();
    // Servers that left in the list are those for which we could not test
    // that they have been successfully initialized.
    if (!ieContext.failureList.isEmpty())
    {
      if (exportRootException == null)
        exportRootException = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
              Long.toString(getGenerationID()),
              ieContext.failureList.toString()));
    }
    if (contextAcquired)
      releaseIEContext();
    }
    catch(DirectoryException de)
    {
      // Notify the peer of the failure
      ErrorMsg errorMsg =
        new ErrorMsg(target,
                         de.getMessageObject());
      broker.publish(errorMsg);
      if (contextAcquired)
        releaseIEContext();
      throw(de);
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
        Integer.toString(serverID),
      serviceID,
      Integer.toString(target2));
        serviceID,
        Integer.toString(serverToInitialize),
      (exportRootException!=null?exportRootException.getLocalizedMessage():""));
    logError(msg);
    if (exportRootException != null)
    {
      throw(exportRootException);
    }
  }
  /*
   * For all remote servers in tht start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
   */
  private void waitForRemoteStartOfInit()
  {
    int waitResultAttempt = 0;
    Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
      replicasWeAreWaitingFor.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
      "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    boolean done = true;
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "[IE] wait for start dsid " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + this.getGenerationID());
        if (ieContext.startList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
          {
            // this one is still not doing the Full Update ... retry later
            done = false;
            try
            { Thread.sleep(100); } catch (InterruptedException e) {}
            waitResultAttempt++;
            break;
          }
          else
          {
            // this one is ok
            replicasWeAreWaitingFor.remove(dsi.getDsId());
          }
        }
      }
    }
    while ((!done) && (waitResultAttempt<1200) // 2mn
        && (!broker.shuttingDown()));
    // Add to the failure list the servers that were here at start time but
    // that never ended with the right generationId.
    for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
      ieContext.failureList.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for start ends with " + ieContext.failureList);
  }
  /*
   * For all remote servers in tht start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
   */
  private void waitForRemoteEndOfInit()
  {
    int waitResultAttempt = 0;
    Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
      replicasWeAreWaitingFor.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    // In case some new servers appear during the init, we want them to be
    // considered in the processing of sorting the successfully initialized
    // and the others
    for (DSInfo dsi : getReplicasList())
      replicasWeAreWaitingFor.add(dsi.getDsId());
    boolean done = true;
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "[IE] wait for end dsid " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + this.getGenerationID());
        if (!ieContext.failureList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
          {
            // this one is still doing the Full Update ... retry later
            done = false;
            try
            { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s
            waitResultAttempt++;
            break;
          }
          else
          {
            // this one is done with the Full Update
            if (dsi.getGenerationId() == this.getGenerationID())
            {
              // and with the expected generationId
              replicasWeAreWaitingFor.remove(dsi.getDsId());
            }
          }
        }
      }
    }
    while ((!done) && (!broker.shuttingDown())); // infinite wait
    // Add to the failure list the servers that were here at start time but
    // that never ended with the right generationId.
    for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
      ieContext.failureList.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end ends with " + ieContext.failureList);
  }
  /**
@@ -1398,33 +1772,42 @@
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * Processes an error message received while an export is
   * on going, or an import will start.
   *
   * @param errorMsg The error message received.
   */
  void abandonImportExport(ErrorMsg errorMsg)
  private void processErrorMsg(ErrorMsg errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (debugEnabled())
      TRACER.debugVerbose(
          " abandonImportExport:" + this.serverID +
          " serviceID: " + this.serviceID +
          " Error Msg received: " + errorMsg);
    if (ieContext != null)
    {
      ieContext.setException(new DirectoryException(ResultCode.OTHER,
        errorMsg.getDetails()));
      if (ieContext.initializeTask instanceof InitializeTask)
      if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        updateTaskCompletionState(ieContext.getException());
        // The ErrorMsg is received while we have started an initialization
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails()));
        releaseIEContext();
        /*
         * This can happen :
         * - on the first InitReqMsg sent when source in not known for example
         * - on the next attempt when source crashed and did not reconnect
         *   even after the nextInitAttemptDelay
         * During the import, the ErrorMsg will be received by receiveEntryBytes
         */
        if (ieContext.initializeTask instanceof InitializeTask)
        {
          // Update the task that initiated the import
          ((InitializeTask)ieContext.initializeTask).
          updateTaskCompletionState(ieContext.getException());
          releaseIEContext();
        }
      }
      else
      {
        // When we are the exporter in the case of initializeAll
        // exporting must not be stopped on the first error.
      }
    }
  }
@@ -1442,24 +1825,72 @@
    {
      try
      {
        msg = broker.receive();
        // In the context of the total update, we don't want any automatic
        // re-connection done transparently by the broker because of a better
        // RS or because of a connection failure.
        // We want to be notified of topology change in order to track a
        // potential disconnection of the exporter.
        msg = broker.receive(false, false, true);
        if (debugEnabled())
          TRACER.debugVerbose(
              " sid:" + serverID +
              " base DN:" + serviceID +
              " Import EntryBytes received " + msg);
          TRACER.debugInfo(
              "[IE] In " + this.monitor.getMonitorInstanceName() +
            ", receiveEntryBytes " + msg);
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
          if (broker.shuttingDown())
          {
            // The server is in the shutdown process
            return null;
          }
          else
          {
            // Handle connection issues
            if (ieContext.getException() == null)
              ieContext.setException(new DirectoryException(
                  ResultCode.OTHER,
                  ERR_INIT_RS_DISCONNECTION_DURING_IMPORT.get(
                      broker.getReplicationServer())));
            return null;
          }
        }
        // Check good sequentiality of msg received
        if (msg instanceof EntryMsg)
        {
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters(countEntryLimits(entryBytes));
          if (ieContext.exporterProtocolVersion >=
            ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // check the msgCnt of the msg received to check sequenciality
            if (++ieContext.msgCnt != entryMsg.getMsgId())
            {
              if (ieContext.getException() == null)
                ieContext.setException(new DirectoryException(ResultCode.OTHER,
                    ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
                        String.valueOf(ieContext.msgCnt),
                        String.valueOf(entryMsg.getMsgId()))));
              return null;
            }
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            {
              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  this.serverID,
                  entryMsg.getSenderID(),
                  ieContext.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
                TRACER.debugInfo(
                    "[IE] In " + this.monitor.getMonitorInstanceName() +
                    ", publish InitializeRcvAckMsg" + amsg);
            }
          }
          return entryBytes;
        }
        else if (msg instanceof DoneMsg)
@@ -1474,22 +1905,43 @@
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMsg errorMsg = (ErrorMsg)msg;
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
            errorMsg.getDetails()));
          return null;
          if (ieContext.getException() == null)
          {
            ErrorMsg errMsg = (ErrorMsg)msg;
            if (errMsg.getCreationTime() > ieContext.startTime)
            {
              ieContext.setException(
                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
              return null;
            }
          }
        }
        else
        {
          // Other messages received during an import are trashed
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if ((msg instanceof TopologyMsg) &&
              (!this.isRemoteDSConnected(ieContext.importSource)))
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      this.serviceID,
                      Integer.toString(this.serverID),
                      Integer.toString(ieContext.importSource)));
            if (ieContext.getException()==null)
              ieContext.setException(new DirectoryException(ResultCode.OTHER,
                errMsg));
            return null;
          }
        }
      }
      catch(Exception e)
      {
        // TODO: i18n
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
          Message.raw("received an unexpected message type" +
          e.getLocalizedMessage())));
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
      }
    }
  }
@@ -1540,27 +1992,108 @@
   *
   * @throws IOException when an error occurred.
   */
  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) throws IOException
  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
  throws IOException
  {
    // If an error was raised - like receiving an ErrorMsg
    // we just let down the export.
    if (ieContext.getException() != null)
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
    // build the message
    EntryMsg entryMessage = new EntryMsg(
        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
    // Waiting the slowest loop
    while (!broker.shuttingDown())
    {
      IOException ioe = new IOException(ieContext.getException().getMessage());
      ieContext = null;
      throw ioe;
      // If an error was raised - like receiving an ErrorMsg from a remote
      // server that have been stored by the listener thread in the ieContext,
      // we just abandon the export by throwing an exception.
      if (ieContext.getException() != null)
        throw(new IOException(ieContext.getException().getMessage()));
      int slowestServerId = ieContext.getSlowestServer();
      if (!isRemoteDSConnected(slowestServerId))
      {
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
                Integer.toString(ieContext.getSlowestServer()))));
        // .. and abandon the export by throwing an exception.
        IOException ioe =
          new IOException("IOException with nested DirectoryException");
        ioe.initCause(ieContext.getException());
        throw ioe;
      }
      int ourLastExportedCnt = ieContext.msgCnt;
      int slowestCnt = ieContext.ackVals.get(slowestServerId);
      if (debugEnabled())
        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
      if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
        // our export is too far beyond the slowest importer - let's wait
        try { Thread.sleep(100); } catch(Exception e) {}
        // process any connection error
        if ((broker.hasConnectionError())||
            (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
        {
          // publish failed - store the error in the ieContext ...
          DirectoryException de = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
                  Integer.toString(broker.getRsServerId())));
          if (ieContext.getException() == null)
            ieContext.setException(de);
          // .. and abandon the export by throwing an exception.
          throw new IOException(de.getMessage());
        }
      }
      else
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] slowest got to us => stop waiting");
        break;
      }
    } // Waiting the slowest loop
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
    // publish the message
    boolean sent = broker.publish(entryMessage, false);
    // process any publish error
    if (((!sent)||
        (broker.hasConnectionError()))||
        (broker.getNumLostConnections() != ieContext.initNumLostConnections))
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
              Integer.toString(broker.getRsServerId())));
      if (ieContext.getException() == null)
        ieContext.setException(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
    EntryMsg entryMessage = new EntryMsg(
        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length);
    broker.publish(entryMessage);
    // publish succeeded
    try
    {
      ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
    }
    catch (DirectoryException de)
    {
      // store the error in the ieContext ...
      if (ieContext.getException() == null)
        ieContext.setException(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
  }
@@ -1614,127 +2147,285 @@
  }
  /**
   * Initializes this domain from another source server.
   * Initializes asynchronously this domain from a remote source server.
   * Before returning from this call, for the provided task :
   * - the progressing counters are updated during the initialization using
   *   setTotal() and setLeft().
   * - the end of the initialization using updateTaskCompletionState().
   * <p>
   * When this method is called, a request for initialization will
   * be sent to the source server asking for initialization.
   * When this method is called, a request for initialization is sent to the
   * remote source server requesting initialization.
   * <p>
   * The {@link #exportBackend(OutputStream)} will therefore be called
   * on the source server, and the {@link #importBackend(InputStream)}
   * will be called on his server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param source   The server-id of the source from which to initialize.
   *                 The source can be discovered using the
   *                 {@link #getReplicasList()} method.
   *
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   *
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   *                            The task state is updated.
   */
  public void initializeFromRemote(int source, Task initTask)
  throws DirectoryException
  {
    Message errMsg = null;
    if (debugEnabled())
      TRACER.debugInfo("Entering initializeFromRemote");
      TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
    if (!broker.isConnected())
    {
      if (initTask instanceof InitializeTask)
      {
        InitializeTask task = (InitializeTask) initTask;
        task.updateTaskCompletionState(
            new DirectoryException(
                ResultCode.OTHER, ERR_INITIALIZATION_FAILED_NOCONN.get(
                    getServiceID())));
      }
      return;
      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
    }
    acquireIEContext(true);
    ieContext.initializeTask = initTask;
    // We must not test here whether the remote source is connected to
    // the topology by testing if it stands in the replicas list since.
    // In the case of a re-attempt of initialization, the listener thread is
    // running this method directly coming from initailize() method and did
    // not processed any topology message in between the failure and the
    // new attempt.
    try
    {
      // We must immediatly acquire a context to store the task inside
      // The context will be used when we (the listener thread) will receive
      // the InitializeTargetMsg, process the import, and at the end
      // update the task.
    InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
        serviceID, serverID, source);
      acquireIEContext(true);  //test and set if no import already in progress
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
          serviceID, serverID, source, this.initWindow);
    // Publish Init request msg
    broker.publish(initializeMsg);
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
    // .. we expect to receive entries or err after that
      // The normal success processing is now to receive InitTargetMsg then
      // entries from the remote server.
      // The error cases are :
      // - either local error immediatly caught below
      // - a remote error we will receive as an ErrorMsg
    }
    catch(DirectoryException de)
    {
      errMsg = de.getMessageObject();
    }
    catch(Exception e)
    {
      // Should not happen
      errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
          e.getLocalizedMessage());
      logError(errMsg);
    }
    // When error, update the task and raise the error to the caller
    if (errMsg != null)
    {
      // No need to call here updateTaskCompletionState - will be done
      // by the caller
      releaseIEContext();
      DirectoryException de = new DirectoryException(
          ResultCode.OTHER,
          errMsg);
      throw (de);
    }
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   * Processes an InitializeTargetMsg received from a remote server
   * meaning processes an initialization from the entries expected to be
   * received now.
   *
   * @param initTargetMsgReceived The message received from the remote server.
   *
   * @param requestorServerId The serverId of the server that requested the
   *                          initialization meaning the server where the
   *                          task has initially been created (this server,
   *                          or the remote server).
   */
  void initialize(InitializeTargetMsg initializeMessage)
  throws DirectoryException
  void initialize(InitializeTargetMsg initTargetMsgReceived,
      int requestorServerId)
  {
    DirectoryException de = null;
    InitializeTask initFromtask = null;
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
        Integer.toString(serverID),
      serviceID,
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
    // Go into full update status
    setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
    if (initializeMessage.getRequestorID() == serverID)
    {
      // The import responds to a request we did so the IEContext
      // is already acquired
    }
    else
    {
      acquireIEContext(true);
    }
    ieContext.importSource = initializeMessage.getsenderID();
    ieContext.entryLeftCount = initializeMessage.getEntryCount();
    ieContext.setCounters(
        initializeMessage.getEntryCount(),
        initializeMessage.getEntryCount());
    int source = initTargetMsgReceived.getSenderID();
    try
    {
      // Log starting
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
          Integer.toString(serverID),
          serviceID,
          Long.toString(initTargetMsgReceived.getInitiatorID()));
      logError(msg);
      // Go into full update status
      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
      // Acquire an import context if no already done (and initialize).
      if (initTargetMsgReceived.getInitiatorID() == this.serverID)
      {
        // The initTargetMsgReceived received is the answer to a request that
        // we (this server) sent previously. In this case, so the IEContext
        // has been already acquired when the request was published in order
        // to store the task (to be updated with the status at the end).
      }
      else
      {
        // The initTargetMsgReceived is for an import initiated by the remote
        // server.
        // Test and set if no import already in progress
        acquireIEContext(true);
      }
      // Initialize stuff
      ieContext.importSource = source;
      ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieContext.exporterProtocolVersion = getProtocolVersion(source);
      initFromtask = (InitializeTask)ieContext.initializeTask;
      // Lauch the import
      importBackend(new ReplInputStream(this));
      broker.reStart();
    }
    catch (DirectoryException e)
    {
      de = e;
      // Store the exception raised. It will be considered if no other exception
      // has been previously stored in  the context
      if (ieContext.getException() == null)
        ieContext.setException(e);
    }
    finally
    {
      if ((ieContext != null)  && (ieContext.getException() != null))
        de = ieContext.getException();
      if (debugEnabled())
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends import with exception=" + ieContext.getException()
          + " connected=" + broker.isConnected());
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      // It is necessary to restart (reconnect to RS) for different reasons
      //   - when everything went well, reconnect in order to exchange
      //     new state, new generation ID
      //   - when we have connection failure, reconnect to retry a new import
      //     right here, right now
      // we never want retryOnFailure if we fails reconnecting in the restart.
      broker.reStart(false);
      if (ieContext.getException() != null)
      {
        ((InitializeTask)ieContext.initializeTask).
        updateTaskCompletionState(de);
        if (broker.isConnected() && (initFromtask != null)
            && (++ieContext.attemptCnt<2))
        {
          // Worth a new attempt
          // since initFromtask is in this server, connection is ok
          try
          {
            // Wait for the exporter to stabilize - eventually reconnect as
            // well if it was connected to the same RS than the one we lost ...
            Thread.sleep(1000);
            // Restart the whole import protocol exchange by sending again
            // the request
            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                ieContext.getException().getLocalizedMessage()));
            broker.publish(ieContext.initReqMsgSent);
            ieContext.initializeCounters(0);
            ieContext.exception = null;
            ieContext.msgCnt = 0;
            // Processing of the received initTargetMsgReceived is done
            // let's wait for the next one
            return;
          }
          catch(Exception e)
          {
            // An error occurs when sending a new request for a new import.
            // This error is not stored, prefering to keep the initial one.
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
          }
        }
      }
      releaseIEContext();
    }
    // Sends up the root error.
    if (de != null)
      // ===================
      // No new attempt case
      if (debugEnabled())
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends initialization with exception=" + ieContext.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromtask
          + " attempt=" + ieContext.attemptCnt);
      try
      {
        if (broker.isConnected() && (ieContext.getException() != null))
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
              ieContext.getException().getMessageObject());
          broker.publish(errorMsg);
        }
        else // !broker.isConnected()
        {
          // Don't try to reconnect here.
          // The current running thread is the listener thread and will loop on
          // receive() that is expected to manage reconnects attempt.
        }
        // Update the task that initiated the import must be the last thing.
        // Particularly, broker.restart() after import success must be done
        // before some other operations/tasks to be launched,
        // like resetting the generation ID.
        if (initFromtask != null)
        {
          initFromtask.updateTaskCompletionState(ieContext.getException());
        }
      }
      finally
      {
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            Integer.toString(serverID),
            serviceID,
            Long.toString(initTargetMsgReceived.getInitiatorID()),
            (ieContext.getException()!=null?
                ieContext.getException().getLocalizedMessage():""));
        logError(msg);
        releaseIEContext();
      } // finally
    } // finally
  }
  /**
   * Return the protocol version of the DS related to the provided serverid.
   * Returns -1 when the protocol version is not known.
   * @param dsServerId The provided serverid.
   * @return The procotol version.
   */
  short getProtocolVersion(int dsServerId)
  {
    short protocolVersion = -1;
    for (DSInfo dsi : getReplicasList())
    {
      throw de;
      if (dsi.getDsId() == dsServerId)
      {
        protocolVersion = dsi.getProtocolVersion();
        break;
      }
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
        Integer.toString(serverID),
      serviceID,
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
    return protocolVersion;
  }
  /**
@@ -1887,15 +2578,7 @@
    if (debugEnabled())
      TRACER.debugInfo(
          "Server id " + serverID + " and domain " + serviceID
          + "resetGenerationId" + generationIdNewValue);
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
      throw new DirectoryException(
         resultCode, message);
    }
          + " resetGenerationId " + generationIdNewValue);
    ResetGenerationIdMsg genIdMessage = null;
@@ -1907,6 +2590,16 @@
    {
      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
    }
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID,
          Integer.toString(serverID),
          Long.toString(genIdMessage.getGenerationId()));
      throw new DirectoryException(
         resultCode, message);
    }
    broker.publish(genIdMessage);
    // check that at least one ReplicationServer did change its generation-id
@@ -2410,6 +3103,7 @@
    // Wait for the listener thread to stop
    if (listenerThread != null)
      listenerThread.waitForShutdown();
  }
  /**