From ea629fa971db08f2267b50522360563a8fec7f86 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |  104 ++++++++++++++++++++++++++++-----------------------
 1 files changed, 57 insertions(+), 47 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index f597000..775dbc5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -67,7 +67,6 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
 import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -535,7 +534,8 @@
     throws ConfigException
   {
     super(configuration.getBaseDN().toNormalizedString(),
-          configuration.getServerId());
+          configuration.getServerId(),
+          configuration.getInitializationWindowSize());
 
     /**
      * The time in milliseconds between heartbeats from the replication
@@ -1786,7 +1786,7 @@
    */
   @Override
   protected void initializeRemote(int target, int requestorID,
-    Task initTask) throws DirectoryException
+    Task initTask, int initWindow) throws DirectoryException
   {
     if ((target == RoutableMsg.ALL_SERVERS) && fractionalConfig.isFractional())
     {
@@ -1795,7 +1795,7 @@
       throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
     } else
     {
-      super.initializeRemote(target, requestorID, initTask);
+      super.initializeRemote(target, requestorID, initTask, this.initWindow);
     }
   }
 
@@ -2449,23 +2449,25 @@
    */
   public void shutdown()
   {
-    // stop the flush thread
-    shutdown = true;
-
-    // stop the thread in charge of flushing the ServerState.
-    if (flushThread != null)
+    if (!shutdown)
     {
-      synchronized (flushThread)
+      shutdown = true;
+
+      // stop the thread in charge of flushing the ServerState.
+      if (flushThread != null)
       {
-        flushThread.notify();
+        synchronized (flushThread)
+        {
+          flushThread.notify();
+        }
       }
+
+      DirectoryServer.deregisterAlertGenerator(this);
+
+      // stop the ReplicationDomain
+      stopDomain();
     }
 
-    DirectoryServer.deregisterAlertGenerator(this);
-
-    // stop the ReplicationDomain
-    stopDomain();
-
     // wait for completion of the persistentServerState thread.
     try
     {
@@ -3945,16 +3947,15 @@
   }
 
   /**
-   * This method should trigger an import of the replicated data.
+   * This method triggers an import of the replicated data.
    *
-   * @param input                The InputStream from which
+   * @param input                The InputStream from which the data are read.
    * @throws DirectoryException  When needed.
    */
   @Override
-  public void importBackend(InputStream input) throws DirectoryException
+  protected void importBackend(InputStream input) throws DirectoryException
   {
     LDIFImportConfig importConfig = null;
-    DirectoryException de = null;
 
     Backend backend = retrievesBackend(baseDn);
 
@@ -3964,8 +3965,9 @@
       {
         Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
             backend.getBackendID().toString());
-        logError(message);
-        de = new DirectoryException(ResultCode.OTHER, message);
+        if (ieContext.getException() == null)
+          ieContext.setException(new DirectoryException(ResultCode.OTHER,
+              message));
       }
       else
       {
@@ -3997,30 +3999,33 @@
     }
     catch(Exception e)
     {
-      de = new DirectoryException(ResultCode.OTHER,
-          Message.raw(e.getLocalizedMessage()));
+      if (ieContext.getException() == null)
+        ieContext.setException(new DirectoryException(
+            ResultCode.OTHER,
+            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
     }
     finally
     {
-      // Cleanup
-      if (importConfig != null)
-      {
-        importConfig.close();
-
-        // Re-enable backend
-        closeBackendImport(backend);
-
-        backend = retrievesBackend(baseDn);
-      }
-
       try
       {
+        // Cleanup
+        if (importConfig != null)
+        {
+          importConfig.close();
+          closeBackendImport(backend); // Re-enable backend
+          backend = retrievesBackend(baseDn);
+        }
+
         loadDataState();
 
-        if (debugEnabled())
-          TRACER.debugInfo(
-              "After import, the replication plugin restarts connections" +
-              " to all RSs to provide new generation ID=" + generationId);
+        if (ieContext.getException() != null)
+        {
+          // When an error occurred during an import, most of times
+          // the generationId coming in the root entry of the imported data,
+          // is not valid anymore (partial data in the backend).
+          generationId = computeGenerationId();
+          saveGenerationId(generationId);
+        }
       }
       catch (DirectoryException fe)
       {
@@ -4029,15 +4034,15 @@
         // so we don't bother about the new Exception.
         // However if there was no Exception before we want
         // to return this Exception to the task creator.
-        if (de == null)
-          de = fe;
+        if (ieContext.getException() == null)
+          ieContext.setException(new DirectoryException(
+              ResultCode.OTHER,
+              ERR_INIT_IMPORT_FAILURE.get(fe.getLocalizedMessage())));
       }
     }
-    // Sends up the root error.
-    if (de != null)
-    {
-      throw de;
-    }
+
+    if (ieContext.getException() != null)
+      throw ieContext.getException();
   }
 
   /**
@@ -4810,14 +4815,19 @@
     }
 
     ResultCode resultCode = ResultCode.OTHER;
-    Message message = ERR_INVALID_IMPORT_SOURCE.get();
     if (cause != null)
     {
+      Message message = ERR_INVALID_IMPORT_SOURCE.get(
+          baseDn.toNormalizedString(), Integer.toString(serverId),
+          Integer.toString(source),"Details:" + cause.getLocalizedMessage());
       throw new DirectoryException(
           resultCode, message, cause);
     }
     else
     {
+      Message message = ERR_INVALID_IMPORT_SOURCE.get(
+          baseDn.toNormalizedString(), Integer.toString(serverId),
+          Integer.toString(source),"");
       throw new DirectoryException(
           resultCode, message);
     }

--
Gitblit v1.10.0