From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 1236 +++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 965 insertions(+), 271 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7729aa3..2ee1ec0 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -72,8 +72,9 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.opends.messages.Category;
 import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.task.Task;
 import org.opends.server.loggers.debug.DebugTracer;
@@ -91,11 +92,13 @@
 import org.opends.server.replication.protocol.HeartbeatMsg;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
+import org.opends.server.replication.protocol.InitializeRcvAckMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 
@@ -284,6 +287,15 @@
   private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
     new HashMap<Integer,Integer>();
 
+  /**
+   * Window size used during initialization .. between
+   * - the initializer/exporter DS that listens/waits acknowledges and that
+   *   slows down data msg publishing based on the slowest server
+   * - and each initialized/importer DS that publishes acknowledges each
+   *   WINDOW/2 data msg received.
+   */
+  protected int initWindow = 100;
+
   /* Status related monitoring fields */
 
   // Indicates the date when the status changed. This may be used to indicate
@@ -328,6 +340,28 @@
    *                   to the Replication Domain.
    *                   This identifier should be different for each server that
    *                   is participating to a given Replication Domain.
+   * @param initWindow Window used during initialization.
+   */
+  public ReplicationDomain(String serviceID, int serverID,int initWindow)
+  {
+    this.serviceID = serviceID;
+    this.serverID = serverID;
+    this.initWindow = initWindow;
+    this.state = new ServerState();
+    this.generator = new ChangeNumberGenerator(serverID, state);
+
+    domains.put(serviceID, this);
+  }
+
+  /**
+   * Creates a ReplicationDomain with the provided parameters.
+   *
+   * @param serviceID  The identifier of the Replication Domain to which
+   *                   this object is participating.
+   * @param serverID   The identifier of the server that is participating
+   *                   to the Replication Domain.
+   *                   This identifier should be different for each server that
+   *                   is participating to a given Replication Domain.
    */
   public ReplicationDomain(String serviceID, int serverID)
   {
@@ -557,6 +591,22 @@
   }
 
   /**
+   * Check if a remote replica (DS) is connected to the topology based on
+   * the TopologyMsg we received when the remote replica connected or
+   * disconnected.
+   *
+   * @param serverId The provided serverId of the remote replica
+   * @return whether the remote replica is connected or not.
+   */
+  public boolean isRemoteDSConnected(int serverId)
+  {
+    for (DSInfo remoteDS : getReplicasList())
+      if (remoteDS.getDsId() == serverId)
+        return true;
+    return false;
+  }
+
+  /**
    * Gets the States of all the Replicas currently in the
    * Topology.
    * When this method is called, a Monitoring message will be sent
@@ -708,7 +758,8 @@
 
   /**
    * Receives an update message from the replicationServer.
-   * also responsible for updating the list of pending changes
+   * The other types of messages are processed in an opaque way for the caller.
+   * Also responsible for updating the list of pending changes
    * @return the received message - null if none
    */
   UpdateMsg receive()
@@ -717,11 +768,11 @@
 
     while (update == null)
     {
-      InitializeRequestMsg initMsg = null;
+      InitializeRequestMsg initReqMsg = null;
       ReplicationMsg msg;
       try
       {
-        msg = broker.receive(true);
+        msg = broker.receive(true, true, false);
         if (msg == null)
         {
           // The server is in the shutdown process
@@ -741,54 +792,58 @@
         {
           // Another server requests us to provide entries
           // for a total update
-          initMsg = (InitializeRequestMsg)msg;
+          initReqMsg = (InitializeRequestMsg)msg;
         }
         else if (msg instanceof InitializeTargetMsg)
         {
           // Another server is exporting its entries to us
-          InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
+          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
 
-          try
-          {
-            // This must be done while we are still holding the
-            // broker lock because we are now going to receive a
-            // bunch of entries from the remote server and we
-            // want the import thread to catch them and
-            // not the ListenerThread.
-            initialize(importMsg);
-          }
-          catch(DirectoryException de)
-          {
-            // Returns an error message to notify the sender
-            ErrorMsg errorMsg =
-              new ErrorMsg(importMsg.getsenderID(),
-                  de.getMessageObject());
-            MessageBuilder mb = new MessageBuilder();
-            mb.append(de.getMessageObject());
-            TRACER.debugInfo(Message.toString(mb.toMessage()));
-            broker.publish(errorMsg);
-            logError(de.getMessageObject());
-          }
+          // This must be done while we are still holding the
+          // broker lock because we are now going to receive a
+          // bunch of entries from the remote server and we
+          // want the import thread to catch them and
+          // not the ListenerThread.
+          initialize(initTargetMsg, initTargetMsg.getSenderID());
         }
         else if (msg instanceof ErrorMsg)
         {
+          ErrorMsg errorMsg = (ErrorMsg)msg;
           if (ieContext != null)
           {
             // This is an error termination for the 2 following cases :
             // - either during an export
             // - or before an import really started
-            //   For example, when we publish a request and the
-            //  replicationServer did not find any import source.
-            abandonImportExport((ErrorMsg)msg);
+            //    For example, when we publish a request and the
+            //    replicationServer did not find the import source.
+            //
+            // A remote error during the import will be received in the
+            // receiveEntryBytes() method.
+            //
+            if (debugEnabled())
+              TRACER.debugInfo(
+                  "[IE] processErrorMsg:" + this.serverID +
+                  " serviceID: " + this.serviceID +
+                  " Error Msg received: " + errorMsg);
+
+            if (errorMsg.getCreationTime() > ieContext.startTime)
+            {
+              // consider only ErrorMsg that relate to the current import/export
+              processErrorMsg(errorMsg);
+            }
+            else
+            {
+              // Simply log - happen when the ErrorMsg relates to a previous
+              // attempt of initialization while we have started a new one
+              // on this side.
+              logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
+            }
           }
           else
           {
-            /*
-             * Log error message
-             */
-            ErrorMsg errorMsg = (ErrorMsg)msg;
-            logError(ERR_ERROR_MSG_RECEIVED.get(
-                errorMsg.getDetails()));
+            // Simply log - happen if import/export has been terminated
+            // on our side before receiving this ErrorMsg.
+            logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
           }
         }
         else if (msg instanceof ChangeStatusMsg)
@@ -801,6 +856,15 @@
           update = (UpdateMsg) msg;
           generator.adjust(update.getChangeNumber());
         }
+        else if (msg instanceof InitializeRcvAckMsg)
+        {
+          if (ieContext != null)
+          {
+            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
+            ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
+          }
+          // Trash this msg When no input/export is running/should never happen
+        }
       }
       catch (SocketTimeoutException e)
       {
@@ -815,10 +879,11 @@
       // entries to the remote can be handled by the other
       // replay thread when they call this method and therefore the
       // broker.receive() method.
-      if (initMsg != null)
+      if (initReqMsg != null)
       {
         // Do this work in a thread to allow replay thread continue working
-        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
+        ExportThread exportThread = new ExportThread(
+            initReqMsg.getSenderID(), initReqMsg.getInitWindow());
         exportThread.start();
       }
     }
@@ -989,23 +1054,29 @@
    */
 
   /**
-   * This thread is launched when we want to export data to another server that
-   * has requested to be initialized with the data of our backend.
+   * This thread is launched when we want to export data to another server.
+   *
+   * When a task is created locally (so this local server is the initiator)
+   * of the export (Exemple: dsreplication initialize-all),
+   * this thread is NOT used but the task thread is running the export instead).
    */
   private class ExportThread extends DirectoryThread
   {
-    // Id of server that will receive updates
-    private int target;
+    // Id of server that will be initialized
+    private int serverToInitialize;
+    private int initWindow;
 
     /**
      * Constructor for the ExportThread.
      *
-     * @param i Id of server that will receive updates
+     * @param serverToInitialize serverId of server that will receive entries
      */
-    public ExportThread(int i)
+    public ExportThread(int serverToInitialize, int initWindow)
     {
-      super("Export thread " + serverID);
-      this.target = i;
+      super("Export thread from serverId=" + serverID
+          + " to serverId=" + serverToInitialize);
+      this.serverToInitialize = serverToInitialize;
+      this.initWindow = initWindow;
     }
 
     /**
@@ -1015,22 +1086,20 @@
     public void run()
     {
       if (debugEnabled())
-      {
-        TRACER.debugInfo("Export thread starting.");
-      }
-
+        TRACER.debugInfo("[IE] starting " + this.getName());
       try
       {
-        initializeRemote(target, target, null);
+        initializeRemote(serverToInitialize, serverToInitialize, null,
+            initWindow);
       } catch (DirectoryException de)
       {
-      // An error message has been sent to the peer
-      // Nothing more to do locally
+        // An error message has been sent to the peer
+        // This server is not the initiator of the export so there is
+        // nothing more to do locally.
       }
+
       if (debugEnabled())
-      {
-        TRACER.debugInfo("Export thread stopping.");
-      }
+        TRACER.debugInfo("[IE] ending " + this.getName());
     }
   }
 
@@ -1052,13 +1121,49 @@
     // The count for the entry not yet processed
     long entryLeftCount = 0;
 
-    // The exception raised when any
+    // Exception raised during the initialization.
     DirectoryException exception = null;
 
-    // A boolean indicating if the context is related to an
-    // import or an export.
+    // Whether the context is related to an import or an export.
     boolean importInProgress;
 
+    // Current counter of messages exchanged during the initialization
+    int msgCnt = 0;
+
+    // Number of connections lost when we start the initialization.
+    // Will help counting connections lost during initialization,
+    int initNumLostConnections = 0;
+
+    // Request message sent when this server has the initializeFromRemote task.
+    InitializeRequestMsg initReqMsgSent = null;
+
+    // Start time of the initialization process. ErrorMsg timestamped
+    // before thi startTime will be ignored.
+    long startTime;
+
+    // List fo replicas (DS) connected to the topology when
+    // initialization started.
+    Set<Integer> startList = new HashSet<Integer>(0);
+
+    // List fo replicas (DS) with a failure (disconnected from the topology)
+    // since the initialization started.
+    Set<Integer> failureList = new HashSet<Integer>(0);
+
+    // Flow control during initialization
+    // - for each remote server, counter of messages received
+    private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
+    // - serverId of the slowest server (the one with the smallest non null
+    //   counter)
+    private int slowestServerId = -1;
+
+    short exporterProtocolVersion = -1;
+
+    // Window used during this initialization
+    int initWindow;
+
+    // Number of attempt already done for this initialization
+    short attemptCnt;
+
     /**
      * Creates a new IEContext.
      *
@@ -1069,19 +1174,21 @@
     public IEContext(boolean importInProgress)
     {
       this.importInProgress = importInProgress;
+      this.startTime = System.currentTimeMillis();
+      this.attemptCnt = 0;
+
     }
 
     /**
      * Initializes the import/export counters with the provider value.
      * @param total Total number of entries to be processed.
-     * @param left Remaining number of entries to be processed.
      * @throws DirectoryException if an error occurred.
      */
-    public void setCounters(long total, long left)
+    private void initializeCounters(long total)
       throws DirectoryException
     {
       entryCount = total;
-      entryLeftCount = left;
+      entryLeftCount = total;
 
       if (initializeTask != null)
       {
@@ -1193,7 +1300,42 @@
     {
       this.exception = exception;
     }
-  }
+
+    /**
+     * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
+     * (updated via the listener thread)
+     * @param serverId serverId of the acknowledger/receiver/importer server.
+     * @param numAck   id of the message received.
+     */
+    public void setAckVal(int serverId, int numAck)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck);
+
+      this.ackVals.put(serverId, numAck);
+
+      // Recompute the server with the minAck returned,means the slowest server.
+      slowestServerId = serverId;
+      for (Integer sid : ieContext.ackVals.keySet())
+        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
+          slowestServerId = sid;
+    }
+
+    /**
+     * Returns the serverId of the server that acknowledged the smallest
+     * EntryMsg id.
+     * @return serverId of the server with latest acknowledge.
+     *                  0 when no ack has been received yet.
+     */
+    public int getSlowestServer()
+    {
+      if (debugEnabled())
+        TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId
+            + " " + this.ackVals.get(slowestServerId));
+
+      return this.slowestServerId;
+    }
+}
   /**
    * Verifies that the given string represents a valid source
    * from which this server can be initialized.
@@ -1260,34 +1402,10 @@
   public void initializeRemote(int target, Task initTask)
   throws DirectoryException
   {
-    initializeRemote(target, serverID, initTask);
 
-    if (target == RoutableMsg.ALL_SERVERS)
-    {
-      // Check for the status of all remote servers to check if they
-      // are all finished with the import.
-      boolean done = true;
-      do
-      {
-        done = true;
-        for (DSInfo dsi : getReplicasList())
-        {
-          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
-          {
-            done = false;
-            try
-            {
-              Thread.sleep(100);
-            } catch (InterruptedException e)
-            {
-              // just loop again.
-            }
-            break;
-          }
-        }
-      }
-      while (!done);
-    }
+    initializeRemote(target, this.serverID, initTask, this.initWindow);
+
+
   }
 
   /**
@@ -1295,76 +1413,332 @@
    * specified by the target argument when this initialization specifying the
    * server that requests the initialization.
    *
-   * @param target The target that should be initialized.
-   * @param target2 The server that initiated the export.
-   * @param initTask The task that triggers this initialization and that should
-   *  be updated with its progress.
+   * @param serverToInitialize The target server that should be initialized.
+   * @param serverRunningTheTask The server that initiated the export. It can
+   * be the serverID of this server, or the serverID of a remote server.
+   * @param initTask The task in this server that triggers this initialization
+   * and that should be updated with its progress. Null when the export is done
+   * following a request coming from a remote server (task is remote).
+   * @param initWindow The value of the initialization window for flow control
+   * between the importer and the exporter.
    *
-   * @exception DirectoryException When an error occurs.
+   * @exception DirectoryException When an error occurs. No exception raised
+   * means success.
    */
-  protected void initializeRemote(int target, int target2,
-    Task initTask) throws DirectoryException
+  protected void initializeRemote(int serverToInitialize,
+      int serverRunningTheTask, Task initTask, int initWindow)
+  throws DirectoryException
   {
-    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
-        Integer.toString(serverID),
-      serviceID,
-      Integer.toString(target2));
-    logError(msg);
+    DirectoryException exportRootException = null;
+    boolean contextAcquired = false;
 
-    boolean contextAcquired=false;
-
+    // Acquire and initialize the export context
     acquireIEContext(false);
     contextAcquired = true;
-    ieContext.exportTarget = target;
 
-    if (initTask != null)
+    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
+        Integer.toString(serverID), Long.toString(countEntries()), serviceID,
+        Integer.toString(serverToInitialize));
+    logError(msg);
+
+    // We manage the list of servers to initialize in order :
+    // - to test at the end that all expected servers have reconnected
+    //   after their import and with the right genId
+    // - to update the task with the server(s) where this test failed
+
+    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
+      for (DSInfo dsi : getReplicasList())
+        ieContext.startList.add(dsi.getDsId());
+    else
+      ieContext.startList.add(serverToInitialize);
+
+    // We manage the list of servers with which a flow control can be enabled
+    for (DSInfo dsi : getReplicasList())
     {
-      ieContext.initializeTask = initTask;
+      if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        ieContext.setAckVal(dsi.getDsId(), 0);
     }
 
-    // The number of entries to be exported is the number of entries under
-    // the base DN entry and the base entry itself.
-    long entryCount = this.countEntries();
-
-
-    ieContext.setCounters(entryCount, entryCount);
-
-    // Send start message to the peer
-    InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
-        serviceID, serverID, target, target2, entryCount);
-
-    broker.publish(initializeMessage);
-
-    try
+    // loop for the case where the exporter is the initiator
+    int attempt = 0;
+    boolean done = false;
+    while ((!done) && (++attempt<2)) // attempt loop
     {
-      exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
+      try
+      {
+        ieContext.exportTarget = serverToInitialize;
+        if (initTask != null)
+          ieContext.initializeTask = initTask;
+        ieContext.initializeCounters(this.countEntries());
+        ieContext.msgCnt = 0;
+        ieContext.initNumLostConnections = broker.getNumLostConnections();
+        ieContext.initWindow = initWindow;
 
-      // Notify the peer of the success
-      DoneMsg doneMsg = new DoneMsg(serverID,
-          initializeMessage.getDestination());
-      broker.publish(doneMsg);
+        // Send start message to the peer
+        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
+            serviceID, serverID, serverToInitialize, serverRunningTheTask,
+            ieContext.entryCount, initWindow);
 
+        broker.publish(initTargetMsg);
+
+        // Wait for all servers to be ok
+        waitForRemoteStartOfInit();
+
+        // Servers that left in the list are those for which we could not test
+        // that they have been successfully initialized.
+        if (!ieContext.failureList.isEmpty())
+        {
+          throw new DirectoryException(
+              ResultCode.OTHER,
+              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
+                  ieContext.failureList.toString()));
+        }
+
+        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
+
+        // Notify the peer of the success
+        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
+        broker.publish(doneMsg);
+
+      }
+      catch(DirectoryException exportException)
+      {
+        // Give priority to the first exception raised - stored in the context
+        if (ieContext.exception != null)
+          exportRootException = ieContext.exception;
+        else
+          exportRootException = exportException;
+      }
+
+      if (debugEnabled())
+        TRACER.debugInfo(
+           "[IE] In " + this.monitor.getMonitorInstanceName()
+           + " export ends with "
+           + " connected=" + broker.isConnected()
+           + " exportRootException=" + exportRootException);
+
+      if (exportRootException != null)
+      {
+        try
+        {
+          // Handling the errors during export
+
+          // Note: we could have lost the connection and another thread
+          //       the listener one) has already managed to reconnect.
+          //       So we MUST rely on the test broker.isConnected()
+          //       ONLY to do 'wait to be reconnected by another thread'
+          //       (if not yet reconnected already).
+
+          if (!broker.isConnected())
+          {
+            // We are still disconnected, so we wait for the listener thread
+            // to reconnect - wait 10s
+            if (debugEnabled())
+              TRACER.debugInfo(
+                "[IE] Exporter wait for reconnection by the listener thread");
+            int att=0;
+            while ((!broker.shuttingDown()) &&
+                (!broker.isConnected())&& (++att<100))
+              try { Thread.sleep(100); } catch(Exception e){}
+          }
+
+          if ((initTask != null) && broker.isConnected() &&
+              (serverToInitialize != RoutableMsg.ALL_SERVERS))
+          {
+            // NewAttempt case : In the case where
+            // - it's not an InitializeAll
+            // - AND the previous export attempt failed
+            // - AND we are (now) connected
+            // - and we own the task and this task is not an InitializeAll
+            // Let's :
+            // - sleep to let time to the other peer to reconnect if needed
+            // - and launch another attempt
+            try { Thread.sleep(1000); } catch(Exception e){}
+            logError(NOTE_RESENDING_INIT_TARGET.get((exportRootException!=null?
+                exportRootException.getLocalizedMessage():"")));
+
+            continue;
+          }
+
+          ErrorMsg errorMsg =
+              new ErrorMsg(serverToInitialize,
+                  exportRootException.getMessageObject());
+          broker.publish(errorMsg);
+        }
+        catch(Exception e)
+        {
+          // Ignore the failure raised while proceeding the root failure
+        }
+      }
+
+      // We are always done for this export ...
+      // ... except in the NewAttempt case (see above)
+      done = true;
+
+    } // attempt loop
+
+    // Wait for all servers to be ok, and build the failure list
+    waitForRemoteEndOfInit();
+
+    // Servers that left in the list are those for which we could not test
+    // that they have been successfully initialized.
+    if (!ieContext.failureList.isEmpty())
+    {
+      if (exportRootException == null)
+        exportRootException = new DirectoryException(ResultCode.OTHER,
+          ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
+              Long.toString(getGenerationID()),
+              ieContext.failureList.toString()));
+    }
+
+    if (contextAcquired)
       releaseIEContext();
-    }
-    catch(DirectoryException de)
-    {
-      // Notify the peer of the failure
-      ErrorMsg errorMsg =
-        new ErrorMsg(target,
-                         de.getMessageObject());
-      broker.publish(errorMsg);
-
-      if (contextAcquired)
-        releaseIEContext();
-
-      throw(de);
-    }
 
     msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
         Integer.toString(serverID),
-      serviceID,
-      Integer.toString(target2));
+        serviceID,
+        Integer.toString(serverToInitialize),
+      (exportRootException!=null?exportRootException.getLocalizedMessage():""));
     logError(msg);
+
+    if (exportRootException != null)
+    {
+      throw(exportRootException);
+    }
+
+  }
+
+  /*
+   * For all remote servers in tht start list,
+   * - wait it has finished the import and present the expected generationID
+   * - build the failureList
+   */
+  private void waitForRemoteStartOfInit()
+  {
+    int waitResultAttempt = 0;
+    Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
+
+    for (Integer sid : ieContext.startList)
+      replicasWeAreWaitingFor.add(sid);
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+      "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
+
+    boolean done = true;
+    do
+    {
+      done = true;
+      for (DSInfo dsi : getReplicasList())
+      {
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "[IE] wait for start dsid " + dsi.getDsId()
+            + " " + dsi.getStatus()
+            + " " + dsi.getGenerationId()
+            + " " + this.getGenerationID());
+        if (ieContext.startList.contains(dsi.getDsId()))
+        {
+          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
+          {
+            // this one is still not doing the Full Update ... retry later
+            done = false;
+            try
+            { Thread.sleep(100); } catch (InterruptedException e) {}
+            waitResultAttempt++;
+            break;
+          }
+          else
+          {
+            // this one is ok
+            replicasWeAreWaitingFor.remove(dsi.getDsId());
+          }
+        }
+      }
+    }
+    while ((!done) && (waitResultAttempt<1200) // 2mn
+        && (!broker.shuttingDown()));
+
+    // Add to the failure list the servers that were here at start time but
+    // that never ended with the right generationId.
+    for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
+      ieContext.failureList.add(sid);
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "[IE] wait for start ends with " + ieContext.failureList);
+  }
+
+  /*
+   * For all remote servers in tht start list,
+   * - wait it has finished the import and present the expected generationID
+   * - build the failureList
+   */
+  private void waitForRemoteEndOfInit()
+  {
+    int waitResultAttempt = 0;
+    Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(0);
+
+    for (Integer sid : ieContext.startList)
+      replicasWeAreWaitingFor.add(sid);
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
+
+    // In case some new servers appear during the init, we want them to be
+    // considered in the processing of sorting the successfully initialized
+    // and the others
+    for (DSInfo dsi : getReplicasList())
+      replicasWeAreWaitingFor.add(dsi.getDsId());
+
+    boolean done = true;
+    do
+    {
+      done = true;
+      for (DSInfo dsi : getReplicasList())
+      {
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "[IE] wait for end dsid " + dsi.getDsId()
+            + " " + dsi.getStatus()
+            + " " + dsi.getGenerationId()
+            + " " + this.getGenerationID());
+        if (!ieContext.failureList.contains(dsi.getDsId()))
+        {
+          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
+          {
+            // this one is still doing the Full Update ... retry later
+            done = false;
+            try
+            { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s
+            waitResultAttempt++;
+            break;
+          }
+          else
+          {
+            // this one is done with the Full Update
+            if (dsi.getGenerationId() == this.getGenerationID())
+            {
+              // and with the expected generationId
+              replicasWeAreWaitingFor.remove(dsi.getDsId());
+            }
+          }
+        }
+      }
+    }
+    while ((!done) && (!broker.shuttingDown())); // infinite wait
+
+    // Add to the failure list the servers that were here at start time but
+    // that never ended with the right generationId.
+    for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
+      ieContext.failureList.add(sid);
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "[IE] wait for end ends with " + ieContext.failureList);
+
+
   }
 
   /**
@@ -1398,33 +1772,42 @@
   }
 
   /**
-   * Processes an error message received while an import/export is
-   * on going.
+   * Processes an error message received while an export is
+   * on going, or an import will start.
+   *
    * @param errorMsg The error message received.
    */
-  void abandonImportExport(ErrorMsg errorMsg)
+  private void processErrorMsg(ErrorMsg errorMsg)
   {
-    // FIXME TBD Treat the case where the error happens while entries
-    // are being exported
-
-    if (debugEnabled())
-      TRACER.debugVerbose(
-          " abandonImportExport:" + this.serverID +
-          " serviceID: " + this.serviceID +
-          " Error Msg received: " + errorMsg);
-
     if (ieContext != null)
     {
-      ieContext.setException(new DirectoryException(ResultCode.OTHER,
-        errorMsg.getDetails()));
-
-      if (ieContext.initializeTask instanceof InitializeTask)
+      if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
       {
-        // Update the task that initiated the import
-        ((InitializeTask)ieContext.initializeTask).
-        updateTaskCompletionState(ieContext.getException());
+        // The ErrorMsg is received while we have started an initialization
+        if (ieContext.getException() == null)
+          ieContext.setException(new DirectoryException(ResultCode.OTHER,
+              errorMsg.getDetails()));
 
-        releaseIEContext();
+        /*
+         * This can happen :
+         * - on the first InitReqMsg sent when source in not known for example
+         * - on the next attempt when source crashed and did not reconnect
+         *   even after the nextInitAttemptDelay
+         * During the import, the ErrorMsg will be received by receiveEntryBytes
+         */
+        if (ieContext.initializeTask instanceof InitializeTask)
+        {
+          // Update the task that initiated the import
+          ((InitializeTask)ieContext.initializeTask).
+          updateTaskCompletionState(ieContext.getException());
+
+          releaseIEContext();
+        }
+      }
+      else
+      {
+        // When we are the exporter in the case of initializeAll
+        // exporting must not be stopped on the first error.
       }
     }
   }
@@ -1442,24 +1825,72 @@
     {
       try
       {
-        msg = broker.receive();
+        // In the context of the total update, we don't want any automatic
+        // re-connection done transparently by the broker because of a better
+        // RS or because of a connection failure.
+        // We want to be notified of topology change in order to track a
+        // potential disconnection of the exporter.
+        msg = broker.receive(false, false, true);
 
         if (debugEnabled())
-          TRACER.debugVerbose(
-              " sid:" + serverID +
-              " base DN:" + serviceID +
-              " Import EntryBytes received " + msg);
+          TRACER.debugInfo(
+              "[IE] In " + this.monitor.getMonitorInstanceName() +
+            ", receiveEntryBytes " + msg);
+
         if (msg == null)
         {
-          // The server is in the shutdown process
-          return null;
+          if (broker.shuttingDown())
+          {
+            // The server is in the shutdown process
+            return null;
+          }
+          else
+          {
+            // Handle connection issues
+            if (ieContext.getException() == null)
+              ieContext.setException(new DirectoryException(
+                  ResultCode.OTHER,
+                  ERR_INIT_RS_DISCONNECTION_DURING_IMPORT.get(
+                      broker.getReplicationServer())));
+            return null;
+          }
         }
 
+        // Check good sequentiality of msg received
         if (msg instanceof EntryMsg)
         {
           EntryMsg entryMsg = (EntryMsg)msg;
           byte[] entryBytes = entryMsg.getEntryBytes();
           ieContext.updateCounters(countEntryLimits(entryBytes));
+
+          if (ieContext.exporterProtocolVersion >=
+            ProtocolVersion.REPLICATION_PROTOCOL_V4)
+          {
+            // check the msgCnt of the msg received to check sequenciality
+            if (++ieContext.msgCnt != entryMsg.getMsgId())
+            {
+              if (ieContext.getException() == null)
+                ieContext.setException(new DirectoryException(ResultCode.OTHER,
+                    ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
+                        String.valueOf(ieContext.msgCnt),
+                        String.valueOf(entryMsg.getMsgId()))));
+              return null;
+            }
+
+            // send the ack of flow control mgmt
+            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
+            {
+              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
+                  this.serverID,
+                  entryMsg.getSenderID(),
+                  ieContext.msgCnt);
+              broker.publish(amsg, false);
+              if (debugEnabled())
+                TRACER.debugInfo(
+                    "[IE] In " + this.monitor.getMonitorInstanceName() +
+                    ", publish InitializeRcvAckMsg" + amsg);
+            }
+          }
           return entryBytes;
         }
         else if (msg instanceof DoneMsg)
@@ -1474,22 +1905,43 @@
           // This is an error termination during the import
           // The error is stored and the import is ended
           // by returning null
-          ErrorMsg errorMsg = (ErrorMsg)msg;
-          ieContext.setException(new DirectoryException(ResultCode.OTHER,
-            errorMsg.getDetails()));
-          return null;
+          if (ieContext.getException() == null)
+          {
+            ErrorMsg errMsg = (ErrorMsg)msg;
+            if (errMsg.getCreationTime() > ieContext.startTime)
+            {
+              ieContext.setException(
+                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
+              return null;
+            }
+          }
         }
         else
         {
-          // Other messages received during an import are trashed
+          // Other messages received during an import are trashed except
+          // the topologyMsg.
+          if ((msg instanceof TopologyMsg) &&
+              (!this.isRemoteDSConnected(ieContext.importSource)))
+          {
+            Message errMsg =
+              Message.raw(Category.SYNC, Severity.NOTICE,
+                  ERR_INIT_EXPORTER_DISCONNECTION.get(
+                      this.serviceID,
+                      Integer.toString(this.serverID),
+                      Integer.toString(ieContext.importSource)));
+            if (ieContext.getException()==null)
+              ieContext.setException(new DirectoryException(ResultCode.OTHER,
+                errMsg));
+            return null;
+          }
         }
       }
       catch(Exception e)
       {
         // TODO: i18n
-        ieContext.setException(new DirectoryException(ResultCode.OTHER,
-          Message.raw("received an unexpected message type" +
-          e.getLocalizedMessage())));
+        if (ieContext.getException() == null)
+          ieContext.setException(new DirectoryException(ResultCode.OTHER,
+            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
       }
     }
   }
@@ -1540,27 +1992,108 @@
    *
    * @throws IOException when an error occurred.
    */
-  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) throws IOException
+  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
+  throws IOException
   {
-    // If an error was raised - like receiving an ErrorMsg
-    // we just let down the export.
-    if (ieContext.getException() != null)
+    if (debugEnabled())
+      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
+
+    // build the message
+    EntryMsg entryMessage = new EntryMsg(
+        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
+        ++ieContext.msgCnt);
+
+    // Waiting the slowest loop
+    while (!broker.shuttingDown())
     {
-      IOException ioe = new IOException(ieContext.getException().getMessage());
-      ieContext = null;
-      throw ioe;
+      // If an error was raised - like receiving an ErrorMsg from a remote
+      // server that have been stored by the listener thread in the ieContext,
+      // we just abandon the export by throwing an exception.
+      if (ieContext.getException() != null)
+        throw(new IOException(ieContext.getException().getMessage()));
+
+      int slowestServerId = ieContext.getSlowestServer();
+      if (!isRemoteDSConnected(slowestServerId))
+      {
+        ieContext.setException(new DirectoryException(ResultCode.OTHER,
+            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
+                Integer.toString(ieContext.getSlowestServer()))));
+        // .. and abandon the export by throwing an exception.
+        IOException ioe =
+          new IOException("IOException with nested DirectoryException");
+        ioe.initCause(ieContext.getException());
+        throw ioe;
+      }
+
+      int ourLastExportedCnt = ieContext.msgCnt;
+      int slowestCnt = ieContext.ackVals.get(slowestServerId);
+
+      if (debugEnabled())
+        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
+            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
+
+      if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
+      {
+        if (debugEnabled())
+          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
+
+        // our export is too far beyond the slowest importer - let's wait
+        try { Thread.sleep(100); } catch(Exception e) {}
+
+        // process any connection error
+        if ((broker.hasConnectionError())||
+            (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
+        {
+          // publish failed - store the error in the ieContext ...
+          DirectoryException de = new DirectoryException(ResultCode.OTHER,
+              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
+                  Integer.toString(broker.getRsServerId())));
+          if (ieContext.getException() == null)
+            ieContext.setException(de);
+          // .. and abandon the export by throwing an exception.
+          throw new IOException(de.getMessage());
+        }
+      }
+      else
+      {
+        if (debugEnabled())
+          TRACER.debugInfo("[IE] slowest got to us => stop waiting");
+        break;
+      }
+    } // Waiting the slowest loop
+
+    if (debugEnabled())
+      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
+
+    // publish the message
+    boolean sent = broker.publish(entryMessage, false);
+
+    // process any publish error
+    if (((!sent)||
+        (broker.hasConnectionError()))||
+        (broker.getNumLostConnections() != ieContext.initNumLostConnections))
+    {
+      // publish failed - store the error in the ieContext ...
+      DirectoryException de = new DirectoryException(ResultCode.OTHER,
+          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
+              Integer.toString(broker.getRsServerId())));
+      if (ieContext.getException() == null)
+        ieContext.setException(de);
+      // .. and abandon the export by throwing an exception.
+      throw new IOException(de.getMessage());
     }
 
-    EntryMsg entryMessage = new EntryMsg(
-        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length);
-    broker.publish(entryMessage);
-
+    // publish succeeded
     try
     {
       ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
     }
     catch (DirectoryException de)
     {
+      // store the error in the ieContext ...
+      if (ieContext.getException() == null)
+        ieContext.setException(de);
+      // .. and abandon the export by throwing an exception.
       throw new IOException(de.getMessage());
     }
   }
@@ -1614,127 +2147,285 @@
   }
 
   /**
-   * Initializes this domain from another source server.
+   * Initializes asynchronously this domain from a remote source server.
+   * Before returning from this call, for the provided task :
+   * - the progressing counters are updated during the initialization using
+   *   setTotal() and setLeft().
+   * - the end of the initialization using updateTaskCompletionState().
    * <p>
-   * When this method is called, a request for initialization will
-   * be sent to the source server asking for initialization.
+   * When this method is called, a request for initialization is sent to the
+   * remote source server requesting initialization.
    * <p>
-   * The {@link #exportBackend(OutputStream)} will therefore be called
-   * on the source server, and the {@link #importBackend(InputStream)}
-   * will be called on his server.
-   * <p>
-   * The InputStream and OutpuStream given as a parameter to those
-   * methods will be connected through the replication protocol.
    *
    * @param source   The server-id of the source from which to initialize.
    *                 The source can be discovered using the
    *                 {@link #getReplicasList()} method.
+   *
    * @param initTask The task that launched the initialization
    *                 and should be updated of its progress.
    *
    * @throws DirectoryException If it was not possible to publish the
    *                            Initialization message to the Topology.
+   *                            The task state is updated.
    */
   public void initializeFromRemote(int source, Task initTask)
   throws DirectoryException
   {
+    Message errMsg = null;
+
     if (debugEnabled())
-      TRACER.debugInfo("Entering initializeFromRemote");
+      TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
 
     if (!broker.isConnected())
     {
-      if (initTask instanceof InitializeTask)
-      {
-        InitializeTask task = (InitializeTask) initTask;
-        task.updateTaskCompletionState(
-            new DirectoryException(
-                ResultCode.OTHER, ERR_INITIALIZATION_FAILED_NOCONN.get(
-                    getServiceID())));
-      }
-      return;
+      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
     }
 
-    acquireIEContext(true);
-    ieContext.initializeTask = initTask;
+    // We must not test here whether the remote source is connected to
+    // the topology by testing if it stands in the replicas list since.
+    // In the case of a re-attempt of initialization, the listener thread is
+    // running this method directly coming from initailize() method and did
+    // not processed any topology message in between the failure and the
+    // new attempt.
+    try
+    {
+      // We must immediatly acquire a context to store the task inside
+      // The context will be used when we (the listener thread) will receive
+      // the InitializeTargetMsg, process the import, and at the end
+      // update the task.
 
-    InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
-        serviceID, serverID, source);
+      acquireIEContext(true);  //test and set if no import already in progress
+      ieContext.initializeTask = initTask;
+      ieContext.attemptCnt = 0;
+      ieContext.initReqMsgSent = new InitializeRequestMsg(
+          serviceID, serverID, source, this.initWindow);
 
-    // Publish Init request msg
-    broker.publish(initializeMsg);
+      // Publish Init request msg
+      broker.publish(ieContext.initReqMsgSent);
 
-    // .. we expect to receive entries or err after that
+      // The normal success processing is now to receive InitTargetMsg then
+      // entries from the remote server.
+      // The error cases are :
+      // - either local error immediatly caught below
+      // - a remote error we will receive as an ErrorMsg
+    }
+    catch(DirectoryException de)
+    {
+      errMsg = de.getMessageObject();
+    }
+    catch(Exception e)
+    {
+      // Should not happen
+      errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
+          e.getLocalizedMessage());
+      logError(errMsg);
+    }
+
+    // When error, update the task and raise the error to the caller
+    if (errMsg != null)
+    {
+      // No need to call here updateTaskCompletionState - will be done
+      // by the caller
+      releaseIEContext();
+      DirectoryException de = new DirectoryException(
+          ResultCode.OTHER,
+          errMsg);
+      throw (de);
+    }
   }
 
   /**
-   * Initializes the domain's backend with received entries.
-   * @param initializeMessage The message that initiated the import.
-   * @exception DirectoryException Thrown when an error occurs.
+   * Processes an InitializeTargetMsg received from a remote server
+   * meaning processes an initialization from the entries expected to be
+   * received now.
+   *
+   * @param initTargetMsgReceived The message received from the remote server.
+   *
+   * @param requestorServerId The serverId of the server that requested the
+   *                          initialization meaning the server where the
+   *                          task has initially been created (this server,
+   *                          or the remote server).
    */
-  void initialize(InitializeTargetMsg initializeMessage)
-  throws DirectoryException
+  void initialize(InitializeTargetMsg initTargetMsgReceived,
+      int requestorServerId)
   {
-    DirectoryException de = null;
+    InitializeTask initFromtask = null;
 
-    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
-        Integer.toString(serverID),
-      serviceID,
-      Long.toString(initializeMessage.getRequestorID()));
-    logError(msg);
+    if (debugEnabled())
+      TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
 
-    // Go into full update status
-    setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
-
-    if (initializeMessage.getRequestorID() == serverID)
-    {
-      // The import responds to a request we did so the IEContext
-      // is already acquired
-    }
-    else
-    {
-      acquireIEContext(true);
-    }
-
-    ieContext.importSource = initializeMessage.getsenderID();
-    ieContext.entryLeftCount = initializeMessage.getEntryCount();
-    ieContext.setCounters(
-        initializeMessage.getEntryCount(),
-        initializeMessage.getEntryCount());
+    int source = initTargetMsgReceived.getSenderID();
 
     try
     {
+      // Log starting
+      Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
+          Integer.toString(serverID),
+          serviceID,
+          Long.toString(initTargetMsgReceived.getInitiatorID()));
+      logError(msg);
+
+      // Go into full update status
+      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
+
+      // Acquire an import context if no already done (and initialize).
+      if (initTargetMsgReceived.getInitiatorID() == this.serverID)
+      {
+        // The initTargetMsgReceived received is the answer to a request that
+        // we (this server) sent previously. In this case, so the IEContext
+        // has been already acquired when the request was published in order
+        // to store the task (to be updated with the status at the end).
+      }
+      else
+      {
+        // The initTargetMsgReceived is for an import initiated by the remote
+        // server.
+        // Test and set if no import already in progress
+        acquireIEContext(true);
+      }
+
+      // Initialize stuff
+      ieContext.importSource = source;
+      ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
+      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
+      // Protocol version is -1 when not known.
+      ieContext.exporterProtocolVersion = getProtocolVersion(source);
+      initFromtask = (InitializeTask)ieContext.initializeTask;
+
+      // Lauch the import
       importBackend(new ReplInputStream(this));
-      broker.reStart();
+
     }
     catch (DirectoryException e)
     {
-      de = e;
+      // Store the exception raised. It will be considered if no other exception
+      // has been previously stored in  the context
+      if (ieContext.getException() == null)
+        ieContext.setException(e);
     }
     finally
     {
-      if ((ieContext != null)  && (ieContext.getException() != null))
-        de = ieContext.getException();
+      if (debugEnabled())
+        TRACER.debugInfo("[IE] Domain=" + this
+          + " ends import with exception=" + ieContext.getException()
+          + " connected=" + broker.isConnected());
 
-      // Update the task that initiated the import
-      if ((ieContext != null ) && (ieContext.initializeTask != null))
+      // It is necessary to restart (reconnect to RS) for different reasons
+      //   - when everything went well, reconnect in order to exchange
+      //     new state, new generation ID
+      //   - when we have connection failure, reconnect to retry a new import
+      //     right here, right now
+      // we never want retryOnFailure if we fails reconnecting in the restart.
+      broker.reStart(false);
+
+      if (ieContext.getException() != null)
       {
-        ((InitializeTask)ieContext.initializeTask).
-        updateTaskCompletionState(de);
+        if (broker.isConnected() && (initFromtask != null)
+            && (++ieContext.attemptCnt<2))
+        {
+          // Worth a new attempt
+          // since initFromtask is in this server, connection is ok
+          try
+          {
+
+            // Wait for the exporter to stabilize - eventually reconnect as
+            // well if it was connected to the same RS than the one we lost ...
+            Thread.sleep(1000);
+
+            // Restart the whole import protocol exchange by sending again
+            // the request
+            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
+                ieContext.getException().getLocalizedMessage()));
+
+            broker.publish(ieContext.initReqMsgSent);
+
+            ieContext.initializeCounters(0);
+            ieContext.exception = null;
+            ieContext.msgCnt = 0;
+
+            // Processing of the received initTargetMsgReceived is done
+            // let's wait for the next one
+            return;
+          }
+          catch(Exception e)
+          {
+            // An error occurs when sending a new request for a new import.
+            // This error is not stored, prefering to keep the initial one.
+            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
+              e.getLocalizedMessage(),
+              ieContext.getException().getLocalizedMessage()));
+          }
+        }
       }
-      releaseIEContext();
-    }
 
-    // Sends up the root error.
-    if (de != null)
+      // ===================
+      // No new attempt case
+
+      if (debugEnabled())
+        TRACER.debugInfo("[IE] Domain=" + this
+          + " ends initialization with exception=" + ieContext.getException()
+          + " connected=" + broker.isConnected()
+          + " task=" + initFromtask
+          + " attempt=" + ieContext.attemptCnt);
+
+      try
+      {
+        if (broker.isConnected() && (ieContext.getException() != null))
+        {
+          // Let's notify the exporter
+          ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
+              ieContext.getException().getMessageObject());
+          broker.publish(errorMsg);
+        }
+        else // !broker.isConnected()
+        {
+          // Don't try to reconnect here.
+          // The current running thread is the listener thread and will loop on
+          // receive() that is expected to manage reconnects attempt.
+        }
+
+        // Update the task that initiated the import must be the last thing.
+        // Particularly, broker.restart() after import success must be done
+        // before some other operations/tasks to be launched,
+        // like resetting the generation ID.
+        if (initFromtask != null)
+        {
+          initFromtask.updateTaskCompletionState(ieContext.getException());
+        }
+      }
+      finally
+      {
+
+        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
+            Integer.toString(serverID),
+            serviceID,
+            Long.toString(initTargetMsgReceived.getInitiatorID()),
+            (ieContext.getException()!=null?
+                ieContext.getException().getLocalizedMessage():""));
+        logError(msg);
+        releaseIEContext();
+      } // finally
+    } // finally
+  }
+
+  /**
+   * Return the protocol version of the DS related to the provided serverid.
+   * Returns -1 when the protocol version is not known.
+   * @param dsServerId The provided serverid.
+   * @return The procotol version.
+   */
+  short getProtocolVersion(int dsServerId)
+  {
+    short protocolVersion = -1;
+    for (DSInfo dsi : getReplicasList())
     {
-      throw de;
+      if (dsi.getDsId() == dsServerId)
+      {
+        protocolVersion = dsi.getProtocolVersion();
+        break;
+      }
     }
-
-    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
-        Integer.toString(serverID),
-      serviceID,
-      Long.toString(initializeMessage.getRequestorID()));
-    logError(msg);
+    return protocolVersion;
   }
 
   /**
@@ -1887,15 +2578,7 @@
     if (debugEnabled())
       TRACER.debugInfo(
           "Server id " + serverID + " and domain " + serviceID
-          + "resetGenerationId" + generationIdNewValue);
-
-    if (!isConnected())
-    {
-      ResultCode resultCode = ResultCode.OTHER;
-      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
-      throw new DirectoryException(
-         resultCode, message);
-    }
+          + " resetGenerationId " + generationIdNewValue);
 
     ResetGenerationIdMsg genIdMessage = null;
 
@@ -1907,6 +2590,16 @@
     {
       genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
     }
+
+    if (!isConnected())
+    {
+      ResultCode resultCode = ResultCode.OTHER;
+      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID,
+          Integer.toString(serverID),
+          Long.toString(genIdMessage.getGenerationId()));
+      throw new DirectoryException(
+         resultCode, message);
+    }
     broker.publish(genIdMessage);
 
     // check that at least one ReplicationServer did change its generation-id
@@ -2410,6 +3103,7 @@
     // Wait for the listener thread to stop
     if (listenerThread != null)
       listenerThread.waitForShutdown();
+
   }
 
   /**

--
Gitblit v1.10.0