From 48312eb62361cc16c74cd7c68346c23db63a2161 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:36:12 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR

---
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                          |   51 ++++++++++++-------------
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java                             |   62 ++++++++++++++++---------------
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java |    4 +-
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java                            |    2 
 4 files changed, 60 insertions(+), 59 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 3b70e8d..a671f55 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -1488,7 +1488,7 @@
     // process:
     // This is an error termination during the import
     // The error is stored and the import is ended by returning null
-    final IEContext ieCtx = getImportExportContext();
+    final ImportExportContext ieCtx = getImportExportContext();
     LocalizableMessage msg = null;
     switch (importErrorMessageId)
     {
@@ -3689,41 +3689,40 @@
 
     Backend backend = getBackend();
 
-    IEContext ieCtx = getImportExportContext();
+    ImportExportContext ieCtx = getImportExportContext();
     try
     {
       if (!backend.supportsLDIFImport())
       {
         ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
             ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
+        return;
       }
-      else
-      {
-        importConfig = new LDIFImportConfig(input);
-        List<DN> includeBranches = new ArrayList<DN>();
-        includeBranches.add(getBaseDN());
-        importConfig.setIncludeBranches(includeBranches);
-        importConfig.setAppendToExistingData(false);
-        importConfig.setSkipDNValidation(true);
-        // We should not validate schema for replication
-        importConfig.setValidateSchema(false);
-        // Allow fractional replication ldif import plugin to be called
-        importConfig.setInvokeImportPlugins(true);
-        // Reset the follow import flag and message before starting the import
-        importErrorMessageId = -1;
 
-        // TODO How to deal with rejected entries during the import
-        importConfig.writeRejectedEntries(
-            getFileForPath("logs" + File.separator +
-            "replInitRejectedEntries").getAbsolutePath(),
-            ExistingFileBehavior.OVERWRITE);
+      importConfig = new LDIFImportConfig(input);
+      List<DN> includeBranches = new ArrayList<DN>();
+      includeBranches.add(getBaseDN());
+      importConfig.setIncludeBranches(includeBranches);
+      importConfig.setAppendToExistingData(false);
+      importConfig.setSkipDNValidation(true);
+      // We should not validate schema for replication
+      importConfig.setValidateSchema(false);
+      // Allow fractional replication ldif import plugin to be called
+      importConfig.setInvokeImportPlugins(true);
+      // Reset the follow import flag and message before starting the import
+      importErrorMessageId = -1;
 
-        // Process import
-        preBackendImport(backend);
-        backend.importLDIF(importConfig);
+      // TODO How to deal with rejected entries during the import
+      File rejectsFile =
+          getFileForPath("logs" + File.separator + "replInitRejectedEntries");
+      importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(),
+          ExistingFileBehavior.OVERWRITE);
 
-        stateSavingDisabled = false;
-      }
+      // Process import
+      preBackendImport(backend);
+      backend.importLDIF(importConfig);
+
+      stateSavingDisabled = false;
     }
     catch(Exception e)
     {
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 1323777..5ef2bfb 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -242,8 +242,8 @@
    * The context related to an import or export being processed
    * Null when none is being processed.
    */
-  private final AtomicReference<IEContext> ieContext =
-      new AtomicReference<IEContext>();
+  private final AtomicReference<ImportExportContext> importExportContext =
+      new AtomicReference<ImportExportContext>();
 
   /**
    * The Thread waiting for incoming update messages for this domain and pushing
@@ -663,7 +663,7 @@
    * @return the info related to this remote server if it is connected,
    *                  null is the server is NOT connected.
    */
-  private DSInfo isRemoteDSConnected(int dsId)
+  private DSInfo getConnectedRemoteDS(int dsId)
   {
     return getReplicaInfos().get(dsId);
   }
@@ -814,7 +814,7 @@
         else if (msg instanceof ErrorMsg)
         {
           ErrorMsg errorMsg = (ErrorMsg)msg;
-          IEContext ieCtx = ieContext.get();
+          ImportExportContext ieCtx = importExportContext.get();
           if (ieCtx != null)
           {
             /*
@@ -867,7 +867,7 @@
         }
         else if (msg instanceof InitializeRcvAckMsg)
         {
-          IEContext ieCtx = ieContext.get();
+          ImportExportContext ieCtx = importExportContext.get();
           if (ieCtx != null)
           {
             InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
@@ -1108,10 +1108,10 @@
   }
 
   /**
-   * This class contain the context related to an import or export
-   * launched on the domain.
+   * This class contains the context related to an import or export launched on
+   * the domain.
    */
-  protected class IEContext
+  protected class ImportExportContext
   {
     /** The private task that initiated the operation. */
     private Task initializeTask;
@@ -1190,7 +1190,7 @@
      *                         for and import, false if the IEContext
      *                         will be used for and export.
      */
-    private IEContext(boolean importInProgress)
+    private ImportExportContext(boolean importInProgress)
     {
       this.importInProgress = importInProgress;
       this.startTime = System.currentTimeMillis();
@@ -1356,7 +1356,7 @@
 
       // Recompute the server with the minAck returned,means the slowest server.
       slowestServerId = serverId;
-      for (Integer sid : ieContext.get().ackVals.keySet())
+      for (Integer sid : importExportContext.get().ackVals.keySet())
       {
         if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
         {
@@ -1462,7 +1462,7 @@
       int serverRunningTheTask, Task initTask, int initWindow)
   throws DirectoryException
   {
-    final IEContext ieCtx = acquireIEContext(false);
+    final ImportExportContext ieCtx = acquireIEContext(false);
 
     /*
     We manage the list of servers to initialize in order :
@@ -1583,7 +1583,8 @@
               logger.trace(
                 "[IE] Exporter wait for reconnection by the listener thread");
             int att=0;
-            while (!broker.shuttingDown() && !broker.isConnected()
+            while (!broker.shuttingDown()
+                && !broker.isConnected()
                 && ++att < 100)
             {
               try { Thread.sleep(100); }
@@ -1591,7 +1592,8 @@
             }
           }
 
-          if (initTask != null && broker.isConnected()
+          if (initTask != null
+              && broker.isConnected()
               && serverToInitialize != RoutableMsg.ALL_SERVERS)
           {
             /*
@@ -1665,7 +1667,7 @@
    * - wait it has finished the import and present the expected generationID,
    * - build the failureList.
    */
-  private void waitForRemoteStartOfInit(IEContext ieCtx)
+  private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
   {
     final Set<Integer> replicasWeAreWaitingFor =
         new HashSet<Integer>(ieCtx.startList);
@@ -1723,7 +1725,7 @@
    * - wait it has finished the import and present the expected generationID,
    * - build the failureList.
    */
-  private void waitForRemoteEndOfInit(IEContext ieCtx)
+  private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
   {
     final Set<Integer> replicasWeAreWaitingFor =
         new HashSet<Integer>(ieCtx.startList);
@@ -1758,7 +1760,7 @@
           continue;
         }
 
-        DSInfo dsInfo = isRemoteDSConnected(serverId);
+        DSInfo dsInfo = getConnectedRemoteDS(serverId);
         if (dsInfo == null)
         {
           /*
@@ -1823,11 +1825,11 @@
    * Acquire and initialize the import/export context, verifying no other
    * import/export is in progress.
    */
-  private IEContext acquireIEContext(boolean importInProgress)
+  private ImportExportContext acquireIEContext(boolean importInProgress)
       throws DirectoryException
   {
-    final IEContext ieCtx = new IEContext(importInProgress);
-    if (!ieContext.compareAndSet(null, ieCtx))
+    final ImportExportContext ieCtx = new ImportExportContext(importInProgress);
+    if (!importExportContext.compareAndSet(null, ieCtx))
     {
       // Rejects 2 simultaneous exports
       LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
@@ -1838,7 +1840,7 @@
 
   private void releaseIEContext()
   {
-    ieContext.set(null);
+    importExportContext.set(null);
   }
 
   /**
@@ -1847,7 +1849,7 @@
    *
    * @param errorMsg The error message received.
    */
-  private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
+  private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx)
   {
     //Exporting must not be stopped on the first error, if we run initialize-all
     if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
@@ -1885,7 +1887,7 @@
     ReplicationMsg msg;
     while (true)
     {
-      IEContext ieCtx = ieContext.get();
+      ImportExportContext ieCtx = importExportContext.get();
       try
       {
         // In the context of the total update, we don't want any automatic
@@ -1983,7 +1985,7 @@
           // Other messages received during an import are trashed except
           // the topologyMsg.
           if (msg instanceof TopologyMsg
-              && isRemoteDSConnected(ieCtx.importSource) == null)
+              && getConnectedRemoteDS(ieCtx.importSource) == null)
           {
             LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get(
                 getBaseDNString(), getServerId(), ieCtx.importSource);
@@ -2056,7 +2058,7 @@
           Arrays.toString(lDIFEntry));
 
     // build the message
-    IEContext ieCtx = ieContext.get();
+    ImportExportContext ieCtx = importExportContext.get();
     EntryMsg entryMessage = new EntryMsg(
         getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
         ++ieCtx.msgCnt);
@@ -2075,7 +2077,7 @@
       }
 
       int slowestServerId = ieCtx.getSlowestServer();
-      if (isRemoteDSConnected(slowestServerId)==null)
+      if (getConnectedRemoteDS(slowestServerId) == null)
       {
         ieCtx.setException(new DirectoryException(ResultCode.OTHER,
             ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer())));
@@ -2203,7 +2205,7 @@
       update the task.
       */
 
-      final IEContext ieCtx = acquireIEContext(true);
+      final ImportExportContext ieCtx = acquireIEContext(true);
       ieCtx.initializeTask = initTask;
       ieCtx.attemptCnt = 0;
       ieCtx.initReqMsgSent = new InitializeRequestMsg(
@@ -2263,7 +2265,7 @@
 
     int source = initTargetMsgReceived.getSenderID();
 
-    IEContext ieCtx = ieContext.get();
+    ImportExportContext ieCtx = importExportContext.get();
     try
     {
       // Log starting
@@ -2470,7 +2472,7 @@
    */
   public boolean ieRunning()
   {
-    return ieContext.get() != null;
+    return importExportContext.get() != null;
   }
 
   /**
@@ -3484,9 +3486,9 @@
    *
    * @return the Import/Export context associated to this ReplicationDomain
    */
-  protected IEContext getImportExportContext()
+  protected ImportExportContext getImportExportContext()
   {
-    return ieContext.get();
+    return importExportContext.get();
   }
 
   /**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index b3ca6b7..ec307b8 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -131,7 +131,7 @@
         String.valueOf(domain.getGenerationID())));
 
     // Add import/export monitoring attributes
-    final IEContext ieContext = domain.getImportExportContext();
+    final ImportExportContext ieContext = domain.getImportExportContext();
     if (ieContext != null)
     {
       addMonitorData(attributes, "total-update",
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 1f612fe..e0dd4c8 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -44,7 +44,7 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.service.ReplicationDomain.IEContext;
+import org.opends.server.replication.service.ReplicationDomain.*;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.testng.annotations.DataProvider;
@@ -444,7 +444,7 @@
 
   private long getLeftEntryCount(ReplicationDomain domain)
   {
-    final IEContext ieContext = domain.getImportExportContext();
+    final ImportExportContext ieContext = domain.getImportExportContext();
     if (ieContext != null)
     {
       return ieContext.getLeftEntryCount();

--
Gitblit v1.10.0