From 7eda83737e5c2a09bef758ac2bcd3b7ea8b32ce3 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Wed, 20 Jun 2007 18:27:41 +0000
Subject: [PATCH] This refactoring includes the following changes to the JE backend: - Extracted common interface DatabaseContainer from DN2ID, ID2Entry, etc... classes. - Moved database read and write methods from EntryContainer to DatabaseContainer. - Added index configuration to the XML based admin framework. - Removed redundant configuration objects (Config, IndexConfig). - Added exclusive/shared lock to EntryContainer. All access to an EntryContainer must acquire a lock before using the internal  DatabaseContainers or making configuration changes. - Added the ability to add/remove/modify indexes with the backend online. Server will issue rebuild required warning when adding new indexes  or sub-indexes (equality, substring, presence...). - Added the ability to change the index entry limit for both the backend and each index with the backend online. Server will issue rebuild  required warning if the previous limit has been exceeded. - Added the ability to change entry compression and index substring length setting while the backend is online. - Added a persistent state database to each EntryContainer to persist backend configuration between server restarts. Server will issue  rebuild required warning if a new index is added when the backend is offline. - Added a trusted flag to indexes so that non existent keys will not be interpreted as an empty entry ID set when an index is untrusted. An  index is untrusted when it is added to an non-empty EntryContainer or an inconsistency is detected. Server will issue warning on startup to  rebuild the index.  - Fixed a issue where the LDIF import process stops responding if the temporary import dir is full or unwritable. 

---
 opends/src/server/org/opends/server/backends/jeb/ImportJob.java |  328 +++++++++++++++++++++++++----------------------------
 1 files changed, 155 insertions(+), 173 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/ImportJob.java b/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
index 0dcb6a0..72b1c53 100644
--- a/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
+++ b/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -31,10 +31,8 @@
 import com.sleepycat.je.StatsConfig;
 import com.sleepycat.je.Transaction;
 
-import org.opends.server.api.Backend;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.messages.JebMessages;
-import org.opends.server.types.AttributeType;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
@@ -44,25 +42,29 @@
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.LDIFException;
 import org.opends.server.util.LDIFReader;
+import org.opends.server.util.StaticUtils;
+import static org.opends.server.util.StaticUtils.getFileForPath;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
 import static org.opends.server.messages.JebMessages.
-     MSGID_JEB_IMPORT_ENTRY_EXISTS;
+    MSGID_JEB_IMPORT_ENTRY_EXISTS;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.messages.JebMessages.
-     MSGID_JEB_IMPORT_PARENT_NOT_FOUND;
+    MSGID_JEB_IMPORT_PARENT_NOT_FOUND;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.server.messages.JebMessages.*;
+import org.opends.server.admin.std.server.JEBackendCfg;
 
 /**
  * Import from LDIF to a JE backend.
@@ -75,14 +77,9 @@
   private static final DebugTracer TRACER = getTracer();
 
   /**
-   * The backend instance we are importing into.
-   */
-  private Backend backend;
-
-  /**
    * The JE backend configuration.
    */
-  private Config config;
+  private JEBackendCfg config;
 
   /**
    * The root container used for this import job.
@@ -103,7 +100,7 @@
    * Map of base DNs to their import context.
    */
   private HashMap<DN,ImportContext> importMap =
-       new HashMap<DN, ImportContext>();
+      new HashMap<DN, ImportContext>();
 
   /**
    * The number of entries imported.
@@ -116,19 +113,21 @@
    */
   private long progressInterval = 10000;
 
+
+  /**
+   * The import worker threads.
+   */
+  private CopyOnWriteArrayList<ImportThread> threads;
+
   /**
    * Create a new import job.
    *
-   * @param backend The backend performing the import.
-   * @param config The backend configuration.
    * @param ldifImportConfig The LDIF import configuration.
    */
-  public ImportJob(Backend backend, Config config,
-                   LDIFImportConfig ldifImportConfig)
+  public ImportJob(LDIFImportConfig ldifImportConfig)
   {
-    this.backend = backend;
-    this.config = config;
     this.ldifImportConfig = ldifImportConfig;
+    this.threads = new CopyOnWriteArrayList<ImportThread>();
   }
 
   /**
@@ -138,61 +137,32 @@
    * processes the LDIF file, then merges the resulting intermediate
    * files to load the index databases.
    *
+   * @param rootContainer The root container to import into.
    * @throws DatabaseException If an error occurs in the JE database.
    * @throws IOException  If a problem occurs while opening the LDIF file for
    *                      reading, or while reading from the LDIF file.
    * @throws JebException If an error occurs in the JE backend.
    */
-  public void importLDIF()
+  public void importLDIF(RootContainer rootContainer)
       throws DatabaseException, IOException, JebException
   {
-/*
-    envConfig.setConfigParam("je.env.runCleaner", "false");
-    envConfig.setConfigParam("je.log.numBuffers", "2");
-    envConfig.setConfigParam("je.log.bufferSize", "15000000");
-    envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
-    envConfig.setConfigParam("je.log.fileMax", "100000000");
-*/
 
     // Create an LDIF reader. Throws an exception if the file does not exist.
     reader = new LDIFReader(ldifImportConfig);
+    this.rootContainer = rootContainer;
+    this.config = rootContainer.getConfiguration();
 
     int msgID;
     String message;
     long startTime;
+
     try
     {
-      rootContainer = new RootContainer(config, backend);
-      if (ldifImportConfig.appendToExistingData())
-      {
-        rootContainer.open(config.getBackendDirectory(),
-                           config.getBackendPermission(),
-                           false, true, true, true, true, false);
-      }
-      else
-      {
-        rootContainer.open(config.getBackendDirectory(),
-                           config.getBackendPermission(),
-                           false, true, false, false, false, false);
-      }
-
-      if (!ldifImportConfig.appendToExistingData())
-      {
-        // We have the writer lock on the environment, now delete the
-        // environment and re-open it. Only do this when we are
-        // importing to all the base DNs in the backend.
-        rootContainer.close();
-        EnvManager.removeFiles(config.getBackendDirectory().getPath());
-        rootContainer.open(config.getBackendDirectory(),
-                           config.getBackendPermission(),
-                           false, true, false, false, false, false);
-      }
-
       // Divide the total buffer size by the number of threads
       // and give that much to each thread.
-      int importThreadCount = config.getImportThreadCount();
-      long bufferSize = config.getImportBufferSize() /
-           (importThreadCount*config.getBaseDNs().length);
+      int importThreadCount = config.getBackendImportThreadCount();
+      long bufferSize = config.getBackendImportBufferSize() /
+          (importThreadCount*rootContainer.getBaseDNs().size());
 
       msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
       message = getMessage(msgID, importThreadCount);
@@ -211,10 +181,7 @@
                message, msgID);
 
       TRACER.debugInfo(
-        rootContainer.getEnvironmentConfig().toString());
-
-
-      rootContainer.openEntryContainers(config.getBaseDNs());
+          rootContainer.getEnvironmentConfig().toString());
 
       // Create the import contexts for each base DN.
       DN baseDN;
@@ -237,7 +204,7 @@
 
         // Create an entry queue.
         LinkedBlockingQueue<Entry> queue =
-             new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
+            new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
         importContext.setQueue(queue);
 
         importMap.put(baseDN, importContext);
@@ -246,77 +213,70 @@
       // Make a note of the time we started.
       startTime = System.currentTimeMillis();
 
+      // Create a temporary work directory.
+      File tempDir = getFileForPath(config.getBackendImportTempDirectory());
+      if(!tempDir.exists() && !tempDir.mkdir())
+      {
+        msgID = MSGID_JEB_IMPORT_CREATE_TMPDIR_ERROR;
+        String msg = getMessage(msgID, tempDir);
+        throw new IOException(msg);
+      }
+
+      if (tempDir.listFiles() != null)
+      {
+        for (File f : tempDir.listFiles())
+        {
+          f.delete();
+        }
+      }
+
       try
       {
-        // Create a temporary work directory.
-        File tempDir = new File(config.getImportTempDirectory());
-        tempDir.mkdir();
-        if (tempDir.listFiles() != null)
+        importedCount = 0;
+        int     passNumber = 1;
+        boolean moreData   = true;
+        while (moreData)
         {
-          for (File f : tempDir.listFiles())
+          moreData = processLDIF();
+          if (moreData)
           {
-            f.delete();
+            msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
+            message = getMessage(msgID, passNumber++);
+            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                     message, msgID);
           }
-        }
-
-        try
-        {
-          importedCount = 0;
-          int     passNumber = 1;
-          boolean moreData   = true;
-          while (moreData)
+          else
           {
-            moreData = processLDIF();
-            if (moreData)
-            {
-              msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
-              message = getMessage(msgID, passNumber++);
-              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                       message, msgID);
-            }
-            else
-            {
-              msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
-              message = getMessage(msgID);
-              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                       message, msgID);
-            }
-
-
-            long mergeStartTime = System.currentTimeMillis();
-            merge();
-            long mergeEndTime = System.currentTimeMillis();
-
-            if (moreData)
-            {
-              msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
-              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
-              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                       message, msgID);
-            }
-            else
-            {
-              msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
-              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
-              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                       message, msgID);
-            }
+            msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
+            message = getMessage(msgID);
+            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                     message, msgID);
           }
-        }
-        finally
-        {
-          tempDir.delete();
+
+
+          long mergeStartTime = System.currentTimeMillis();
+          merge();
+          long mergeEndTime = System.currentTimeMillis();
+
+          if (moreData)
+          {
+            msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
+            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                     message, msgID);
+          }
+          else
+          {
+            msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
+            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                     message, msgID);
+          }
         }
       }
       finally
       {
-        rootContainer.close();
-
-        // Sync the environment to disk.
-        msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
-        message = getMessage(msgID);
-        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                 message, msgID);
+        tempDir.delete();
       }
     }
     finally
@@ -351,8 +311,6 @@
    */
   public void merge()
   {
-    Map<AttributeType,IndexConfig>
-         indexConfigs = config.getIndexConfigMap();
     ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
 
     // Create merge threads for each base DN.
@@ -362,73 +320,77 @@
       EntryContainer entryContainer = importContext.getEntryContainer();
 
       // For each configured attribute index.
-      for (IndexConfig indexConfig : indexConfigs.values())
+      for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
       {
-        AttributeIndex attrIndex =
-             entryContainer.getAttributeIndex(indexConfig.getAttributeType());
-        if (indexConfig.isEqualityIndex())
+        int indexEntryLimit = config.getBackendIndexEntryLimit();
+        if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
+        {
+          indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
+        }
+
+        if (attrIndex.equalityIndex != null)
         {
           Index index = attrIndex.equalityIndex;
-          String name = containerName + "_" + index.toString();
           IndexMergeThread indexMergeThread =
-               new IndexMergeThread(name, config, ldifImportConfig, index,
-                                    indexConfig.getEqualityEntryLimit());
+              new IndexMergeThread(config,
+                                   ldifImportConfig, index,
+                                   indexEntryLimit);
           mergers.add(indexMergeThread);
         }
-        if (indexConfig.isPresenceIndex())
+        if (attrIndex.presenceIndex != null)
         {
           Index index = attrIndex.presenceIndex;
-          String name = containerName + "_" + index.toString();
           IndexMergeThread indexMergeThread =
-               new IndexMergeThread(name, config, ldifImportConfig, index,
-                                    indexConfig.getPresenceEntryLimit());
+              new IndexMergeThread(config,
+                                   ldifImportConfig, index,
+                                   indexEntryLimit);
           mergers.add(indexMergeThread);
         }
-        if (indexConfig.isSubstringIndex())
+        if (attrIndex.substringIndex != null)
         {
           Index index = attrIndex.substringIndex;
-          String name = containerName + "_" + index.toString();
           IndexMergeThread indexMergeThread =
-               new IndexMergeThread(name, config, ldifImportConfig, index,
-                                    indexConfig.getSubstringEntryLimit());
+              new IndexMergeThread(config,
+                                   ldifImportConfig, index,
+                                   indexEntryLimit);
           mergers.add(indexMergeThread);
         }
-        if (indexConfig.isOrderingIndex())
+        if (attrIndex.orderingIndex != null)
         {
           Index index = attrIndex.orderingIndex;
-          String name = containerName + "_" + index.toString();
           IndexMergeThread indexMergeThread =
-               new IndexMergeThread(name, config, ldifImportConfig, index,
-                                    indexConfig.getEqualityEntryLimit());
+              new IndexMergeThread(config,
+                                   ldifImportConfig, index,
+                                   indexEntryLimit);
           mergers.add(indexMergeThread);
         }
-        if (indexConfig.isApproximateIndex())
+        if (attrIndex.approximateIndex != null)
         {
           Index index = attrIndex.approximateIndex;
-          String name = containerName + "_" + index.toString();
           IndexMergeThread indexMergeThread =
-              new IndexMergeThread(name, config, ldifImportConfig, index,
-                                   indexConfig.getEqualityEntryLimit());
+              new IndexMergeThread(config,
+                                   ldifImportConfig, index,
+                                   indexEntryLimit);
           mergers.add(indexMergeThread);
         }
       }
 
       // Id2Children index.
       Index id2Children = entryContainer.getID2Children();
-      String name = containerName + "_" + id2Children.toString();
       IndexMergeThread indexMergeThread =
-           new IndexMergeThread(name, config, ldifImportConfig,
-                                id2Children,
-                                config.getBackendIndexEntryLimit());
+          new IndexMergeThread(config,
+                               ldifImportConfig,
+                               id2Children,
+                               config.getBackendIndexEntryLimit());
       mergers.add(indexMergeThread);
 
       // Id2Subtree index.
       Index id2Subtree = entryContainer.getID2Subtree();
-      name = containerName + "_" + id2Subtree.toString();
       indexMergeThread =
-           new IndexMergeThread(name, config, ldifImportConfig,
-                           id2Subtree,
-                           config.getBackendIndexEntryLimit());
+          new IndexMergeThread(config,
+                               ldifImportConfig,
+                               id2Subtree,
+                               config.getBackendIndexEntryLimit());
       mergers.add(indexMergeThread);
     }
 
@@ -472,15 +434,12 @@
    *                       reading, or while reading from the LDIF file.
    */
   private boolean processLDIF()
-          throws JebException, DatabaseException, IOException
+      throws JebException, DatabaseException, IOException
   {
     boolean moreData = false;
 
-    ArrayList<ImportThread> threads;
-
     // Create one set of worker threads for each base DN.
-    int importThreadCount = config.getImportThreadCount();
-    threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
+    int importThreadCount = config.getBackendImportThreadCount();
     for (ImportContext ic : importMap.values())
     {
       for (int i = 0; i < importThreadCount; i++)
@@ -498,7 +457,7 @@
       // Create a counter to use to determine whether we've hit the import
       // pass size.
       int entriesProcessed = 0;
-      int importPassSize   = config.getImportPassSize();
+      int importPassSize   = config.getBackendImportPassSize();
       if (importPassSize <= 0)
       {
         importPassSize = Integer.MAX_VALUE;
@@ -514,6 +473,13 @@
       {
         do
         {
+          if(threads.size() <= 0)
+          {
+            int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+            String msg = getMessage(msgID);
+            throw new JebException(msgID, msg);
+          }
+
           try
           {
             // Read the next entry.
@@ -553,17 +519,20 @@
           }
         } while (true);
 
-        // Wait for the queues to be drained.
-        for (ImportContext ic : importMap.values())
+        if(threads.size() > 0)
         {
-          while (ic.getQueue().size() > 0)
+          // Wait for the queues to be drained.
+          for (ImportContext ic : importMap.values())
           {
-            try
+            while (ic.getQueue().size() > 0)
             {
-              Thread.sleep(100);
-            } catch (Exception e)
-            {
-              // No action needed.
+              try
+              {
+                Thread.sleep(100);
+              } catch (Exception e)
+              {
+                // No action needed.
+              }
             }
           }
         }
@@ -612,7 +581,7 @@
    * @throws JebException If an error occurs in the JE backend.
    */
   public void processEntry(ImportContext importContext, Entry entry)
-       throws JebException, DatabaseException
+      throws JebException, DatabaseException
   {
     DN entryDN = entry.getDN();
     LDIFImportConfig ldifImportConfig = importContext.getLDIFImportConfig();
@@ -634,7 +603,7 @@
       {
         // See if we are allowed to replace the entry that exists.
         if (ldifImportConfig.appendToExistingData() &&
-             ldifImportConfig.replaceExistingEntries())
+            ldifImportConfig.replaceExistingEntries())
         {
           // Read the existing entry contents.
           Entry oldEntry = id2entry.get(txn, entryID);
@@ -672,7 +641,7 @@
         // Make sure the parent entry exists, unless this entry is a base DN.
         EntryID parentID = null;
         DN parentDN = importContext.getEntryContainer().
-             getParentWithinBase(entryDN);
+            getParentWithinBase(entryDN);
         if (parentDN != null)
         {
           parentID = dn2id.get(txn, parentDN);
@@ -725,7 +694,15 @@
         // Put the entry on the queue.
         try
         {
-          importContext.getQueue().put(entry);
+          while(!importContext.getQueue().offer(entry, 1000,
+                                                TimeUnit.MILLISECONDS))
+          {
+            if(threads.size() <= 0)
+            {
+              // All worker threads died. We must stop now.
+              return;
+            }
+          }
         }
         catch (InterruptedException e)
         {
@@ -775,7 +752,12 @@
    */
   public void uncaughtException(Thread t, Throwable e)
   {
-    e.printStackTrace();
+    threads.remove(t);
+    int msgID = MSGID_JEB_IMPORT_THREAD_EXCEPTION;
+    String msg = getMessage(msgID, t.getName(),
+                            StaticUtils.stackTraceToSingleLineString(e));
+    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, msg,
+             msgID);
   }
 
   /**
@@ -883,7 +865,7 @@
         EnvironmentStats envStats =
             rootContainer.getEnvironmentStats(new StatsConfig());
         long nCacheMiss =
-             envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+            envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
 
         float cacheMissRate = 0;
         if (deltaCount > 0)

--
Gitblit v1.10.0