From 31153ccec78ed6187b9543babccdaf996968d6bb Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Tue, 01 May 2007 17:20:27 +0000
Subject: [PATCH] Fix for issue #1262: database becomes empty when importing a not existing ldif file.

---
 opends/src/server/org/opends/server/backends/jeb/ImportJob.java |  470 +++++++++++++++++++++++++++++-----------------------------
 1 files changed, 233 insertions(+), 237 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/ImportJob.java b/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
index 6766fc2..02120ab 100644
--- a/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
+++ b/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -150,160 +150,175 @@
     envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
     envConfig.setConfigParam("je.log.fileMax", "100000000");
 */
-    rootContainer = new RootContainer(config, backend);
-    if (ldifImportConfig.appendToExistingData())
+
+    // Create an LDIF reader. Throws an exception if the file does not exist.
+    reader = new LDIFReader(ldifImportConfig);
+
+    int msgID;
+    String message;
+    long startTime;
+    try
     {
-      rootContainer.open(config.getBackendDirectory(),
-                         config.getBackendPermission(),
-                         false, true, true, true, true, false);
-    }
-    else
-    {
-      rootContainer.open(config.getBackendDirectory(),
-                         config.getBackendPermission(),
-                         false, true, false, false, false, false);
-    }
+      rootContainer = new RootContainer(config, backend);
+      if (ldifImportConfig.appendToExistingData())
+      {
+        rootContainer.open(config.getBackendDirectory(),
+                           config.getBackendPermission(),
+                           false, true, true, true, true, false);
+      }
+      else
+      {
+        rootContainer.open(config.getBackendDirectory(),
+                           config.getBackendPermission(),
+                           false, true, false, false, false, false);
+      }
 
-    if (!ldifImportConfig.appendToExistingData())
-    {
-      // We have the writer lock on the environment, now delete the
-      // environment and re-open it. Only do this when we are
-      // importing to all the base DNs in the backend.
-      rootContainer.close();
-      EnvManager.removeFiles(config.getBackendDirectory().getPath());
-      rootContainer.open(config.getBackendDirectory(),
-                         config.getBackendPermission(),
-                         false, true, false, false, false, false);
-    }
+      if (!ldifImportConfig.appendToExistingData())
+      {
+        // We have the writer lock on the environment, now delete the
+        // environment and re-open it. Only do this when we are
+        // importing to all the base DNs in the backend.
+        rootContainer.close();
+        EnvManager.removeFiles(config.getBackendDirectory().getPath());
+        rootContainer.open(config.getBackendDirectory(),
+                           config.getBackendPermission(),
+                           false, true, false, false, false, false);
+      }
 
-    // Divide the total buffer size by the number of threads
-    // and give that much to each thread.
-    int importThreadCount = config.getImportThreadCount();
-    long bufferSize = config.getImportBufferSize() /
-         (importThreadCount*config.getBaseDNs().length);
+      // Divide the total buffer size by the number of threads
+      // and give that much to each thread.
+      int importThreadCount = config.getImportThreadCount();
+      long bufferSize = config.getImportBufferSize() /
+           (importThreadCount*config.getBaseDNs().length);
 
-    int msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
-    String message = getMessage(msgID, importThreadCount);
-    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-             message, msgID);
+      msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
+      message = getMessage(msgID, importThreadCount);
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+               message, msgID);
 
-    msgID = MSGID_JEB_IMPORT_BUFFER_SIZE;
-    message = getMessage(msgID, bufferSize);
-    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-             message, msgID);
+      msgID = MSGID_JEB_IMPORT_BUFFER_SIZE;
+      message = getMessage(msgID, bufferSize);
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+               message, msgID);
 
-    msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG;
-    message = getMessage(msgID,
-                         rootContainer.getEnvironmentConfig().toString());
-    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-             message, msgID);
+      msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG;
+      message = getMessage(msgID,
+                           rootContainer.getEnvironmentConfig().toString());
+      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+               message, msgID);
 
-    DebugLogger.debugInfo(
+      DebugLogger.debugInfo(
         rootContainer.getEnvironmentConfig().toString());
 
 
-    rootContainer.openEntryContainers(config.getBaseDNs());
+      rootContainer.openEntryContainers(config.getBaseDNs());
 
-    // Create the import contexts for each base DN.
-    DN baseDN;
+      // Create the import contexts for each base DN.
+      DN baseDN;
 
-    for (EntryContainer entryContainer : rootContainer.getEntryContainers())
-    {
-      baseDN = entryContainer.getBaseDN();
-
-      // Create an import context.
-      ImportContext importContext = new ImportContext();
-      importContext.setBufferSize(bufferSize);
-      importContext.setConfig(config);
-      importContext.setLDIFImportConfig(this.ldifImportConfig);
-
-      importContext.setBaseDN(baseDN);
-      importContext.setContainerName(entryContainer.getContainerName());
-      importContext.setEntryContainer(entryContainer);
-      importContext.setBufferSize(bufferSize);
-
-      // Create an entry queue.
-      LinkedBlockingQueue<Entry> queue =
-           new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
-      importContext.setQueue(queue);
-
-      importMap.put(baseDN, importContext);
-    }
-
-    // Make a note of the time we started.
-    long startTime = System.currentTimeMillis();
-
-    try
-    {
-      // Create a temporary work directory.
-      File tempDir = new File(config.getImportTempDirectory());
-      tempDir.mkdir();
-      if (tempDir.listFiles() != null)
+      for (EntryContainer entryContainer : rootContainer.getEntryContainers())
       {
-        for (File f : tempDir.listFiles())
-        {
-          f.delete();
-        }
+        baseDN = entryContainer.getBaseDN();
+
+        // Create an import context.
+        ImportContext importContext = new ImportContext();
+        importContext.setBufferSize(bufferSize);
+        importContext.setConfig(config);
+        importContext.setLDIFImportConfig(this.ldifImportConfig);
+        importContext.setLDIFReader(reader);
+
+        importContext.setBaseDN(baseDN);
+        importContext.setContainerName(entryContainer.getContainerName());
+        importContext.setEntryContainer(entryContainer);
+        importContext.setBufferSize(bufferSize);
+
+        // Create an entry queue.
+        LinkedBlockingQueue<Entry> queue =
+             new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
+        importContext.setQueue(queue);
+
+        importMap.put(baseDN, importContext);
       }
 
+      // Make a note of the time we started.
+      startTime = System.currentTimeMillis();
+
       try
       {
-        importedCount = 0;
-        int     passNumber = 1;
-        boolean moreData   = true;
-        while (moreData)
+        // Create a temporary work directory.
+        File tempDir = new File(config.getImportTempDirectory());
+        tempDir.mkdir();
+        if (tempDir.listFiles() != null)
         {
-          moreData = processLDIF();
-          if (moreData)
+          for (File f : tempDir.listFiles())
           {
-            msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
-            message = getMessage(msgID, passNumber++);
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
+            f.delete();
           }
-          else
+        }
+
+        try
+        {
+          importedCount = 0;
+          int     passNumber = 1;
+          boolean moreData   = true;
+          while (moreData)
           {
-            msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
-            message = getMessage(msgID);
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
-          }
+            moreData = processLDIF();
+            if (moreData)
+            {
+              msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
+              message = getMessage(msgID, passNumber++);
+              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                       message, msgID);
+            }
+            else
+            {
+              msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
+              message = getMessage(msgID);
+              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                       message, msgID);
+            }
 
 
-          long mergeStartTime = System.currentTimeMillis();
-          merge();
-          long mergeEndTime = System.currentTimeMillis();
+            long mergeStartTime = System.currentTimeMillis();
+            merge();
+            long mergeEndTime = System.currentTimeMillis();
 
-          if (moreData)
-          {
-            msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
-            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
+            if (moreData)
+            {
+              msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
+              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                       message, msgID);
+            }
+            else
+            {
+              msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
+              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                       message, msgID);
+            }
           }
-          else
-          {
-            msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
-            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
-          }
+        }
+        finally
+        {
+          tempDir.delete();
         }
       }
       finally
       {
-        tempDir.delete();
+        rootContainer.close();
+
+        // Sync the environment to disk.
+        msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
+        message = getMessage(msgID);
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
       }
     }
     finally
     {
-      rootContainer.close();
-
-      // Sync the environment to disk.
-      msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
-      message = getMessage(msgID);
-      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-               message, msgID);
+      reader.close();
     }
 
     long finishTime = System.currentTimeMillis();
@@ -438,14 +453,14 @@
   }
 
   /**
-   * Open a reader for the LDIF file and create a set of worker threads, one
-   * set for each base DN. Read each entry from the LDIF and determine which
+   * Create a set of worker threads, one set for each base DN.
+   * Read each entry from the LDIF and determine which
    * base DN the entry belongs to. Write the dn2id database, then put the
    * entry on the appropriate queue for the worker threads to consume.
    * Record the entry count for each base DN when all entries have been
    * processed.
    *
-   * @return true if is more data to be read from the LDIF file (the import
+   * @return true if thre is more data to be read from the LDIF file (the import
    * pass size was reached), false if the entire LDIF file has been read.
    *
    * @throws JebException If an error occurs in the JE backend.
@@ -458,143 +473,124 @@
   {
     boolean moreData = false;
 
-    // Create an LDIF reader if necessary.
-    if (reader == null)
+    ArrayList<ImportThread> threads;
+
+    // Create one set of worker threads for each base DN.
+    int importThreadCount = config.getImportThreadCount();
+    threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
+    for (ImportContext ic : importMap.values())
     {
-      reader = new LDIFReader(ldifImportConfig);
-      for (ImportContext ic : importMap.values())
+      for (int i = 0; i < importThreadCount; i++)
       {
-        ic.setLDIFReader(reader);
+        ImportThread t = new ImportThread(ic, i);
+        t.setUncaughtExceptionHandler(this);
+        threads.add(t);
+
+        t.start();
       }
     }
 
-    ArrayList<ImportThread> threads;
     try
     {
-      // Create one set of worker threads for each base DN.
-      int importThreadCount = config.getImportThreadCount();
-      threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
-      for (ImportContext ic : importMap.values())
+      // Create a counter to use to determine whether we've hit the import
+      // pass size.
+      int entriesProcessed = 0;
+      int importPassSize   = config.getImportPassSize();
+      if (importPassSize <= 0)
       {
-        for (int i = 0; i < importThreadCount; i++)
-        {
-          ImportThread t = new ImportThread(ic, i);
-          t.setUncaughtExceptionHandler(this);
-          threads.add(t);
-
-          t.start();
-        }
+        importPassSize = Integer.MAX_VALUE;
       }
 
+      // Start a timer for the progress report.
+      Timer timer = new Timer();
+      TimerTask progressTask = new ImportJob.ProgressTask();
+      timer.scheduleAtFixedRate(progressTask, progressInterval,
+                                progressInterval);
+
       try
       {
-        // Create a counter to use to determine whether we've hit the import
-        // pass size.
-        int entriesProcessed = 0;
-        int importPassSize   = config.getImportPassSize();
-        if (importPassSize <= 0)
-        {
-          importPassSize = Integer.MAX_VALUE;
-        }
-
-        // Start a timer for the progress report.
-        Timer timer = new Timer();
-        TimerTask progressTask = new ImportJob.ProgressTask();
-        timer.scheduleAtFixedRate(progressTask, progressInterval,
-                                  progressInterval);
-
-        try
-        {
-          do
-          {
-            try
-            {
-              // Read the next entry.
-              Entry entry = reader.readEntry();
-
-              // Check for end of file.
-              if (entry == null)
-              {
-                break;
-              }
-
-              // Route it according to base DN.
-              ImportContext importContext = getImportConfig(entry.getDN());
-
-              processEntry(importContext, entry);
-
-              entriesProcessed++;
-              if (entriesProcessed >= importPassSize)
-              {
-                moreData = true;
-                break;
-              }
-            }
-            catch (LDIFException e)
-            {
-              if (debugEnabled())
-              {
-                debugCaught(DebugLogLevel.ERROR, e);
-              }
-            }
-            catch (DirectoryException e)
-            {
-              if (debugEnabled())
-              {
-                debugCaught(DebugLogLevel.ERROR, e);
-              }
-            }
-          } while (true);
-
-          // Wait for the queues to be drained.
-          for (ImportContext ic : importMap.values())
-          {
-            while (ic.getQueue().size() > 0)
-            {
-              try
-              {
-                Thread.sleep(100);
-              } catch (Exception e)
-              {
-                // No action needed.
-              }
-            }
-          }
-
-        }
-        finally
-        {
-          timer.cancel();
-        }
-      }
-      finally
-      {
-        // Order the threads to stop.
-        for (ImportThread t : threads)
-        {
-          t.stopProcessing();
-        }
-
-        // Wait for each thread to stop.
-        for (ImportThread t : threads)
+        do
         {
           try
           {
-            t.join();
-            importedCount += t.getImportedCount();
+            // Read the next entry.
+            Entry entry = reader.readEntry();
+
+            // Check for end of file.
+            if (entry == null)
+            {
+              break;
+            }
+
+            // Route it according to base DN.
+            ImportContext importContext = getImportConfig(entry.getDN());
+
+            processEntry(importContext, entry);
+
+            entriesProcessed++;
+            if (entriesProcessed >= importPassSize)
+            {
+              moreData = true;
+              break;
+            }
           }
-          catch (InterruptedException ie)
+          catch (LDIFException e)
           {
-            // No action needed?
+            if (debugEnabled())
+            {
+              debugCaught(DebugLogLevel.ERROR, e);
+            }
+          }
+          catch (DirectoryException e)
+          {
+            if (debugEnabled())
+            {
+              debugCaught(DebugLogLevel.ERROR, e);
+            }
+          }
+        } while (true);
+
+        // Wait for the queues to be drained.
+        for (ImportContext ic : importMap.values())
+        {
+          while (ic.getQueue().size() > 0)
+          {
+            try
+            {
+              Thread.sleep(100);
+            } catch (Exception e)
+            {
+              // No action needed.
+            }
           }
         }
+
+      }
+      finally
+      {
+        timer.cancel();
       }
     }
     finally
     {
-      if (! moreData)
+      // Order the threads to stop.
+      for (ImportThread t : threads)
       {
-        reader.close();
+        t.stopProcessing();
+      }
+
+      // Wait for each thread to stop.
+      for (ImportThread t : threads)
+      {
+        try
+        {
+          t.join();
+          importedCount += t.getImportedCount();
+        }
+        catch (InterruptedException ie)
+        {
+          // No action needed?
+        }
       }
     }
 

--
Gitblit v1.10.0