From 45e94f0f208c47806df0e8ae073c91f797f812a4 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Thu, 26 Jul 2007 16:31:34 +0000
Subject: [PATCH] This fixes issue 1971 and allows partial non-append imports of a backend:

---
 opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java |  896 +++++++++++++++++++++++++++++++++++++++++------------------
 1 files changed, 620 insertions(+), 276 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
index 8e077a1..b06c2c2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -26,10 +26,7 @@
  */
 package org.opends.server.backends.jeb;
 
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.EnvironmentStats;
-import com.sleepycat.je.StatsConfig;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
 
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.messages.JebMessages;
@@ -48,10 +45,7 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
@@ -66,6 +60,8 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.server.messages.JebMessages.*;
 import org.opends.server.admin.std.server.JEBackendCfg;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.config.ConfigException;
 
 /**
  * Import from LDIF to a JE backend.
@@ -108,12 +104,30 @@
    */
   private int importedCount;
 
+  /**
+   * The number of entries migrated.
+   */
+  private int migratedCount;
+
+  /**
+   * The number of merge passes.
+   */
+  int mergePassNumber = 1;
+
 
   /**
    * The number of milliseconds between job progress reports.
    */
   private long progressInterval = 10000;
 
+  /**
+   * The progress report timer.
+   */
+  private Timer timer;
+
+  private int entriesProcessed;
+  private int importPassSize;
+
 
   /**
    * The import worker threads.
@@ -146,15 +160,25 @@
    * @throws IOException  If a problem occurs while opening the LDIF file for
    *                      reading, or while reading from the LDIF file.
    * @throws JebException If an error occurs in the JE backend.
+   * @throws DirectoryException if a directory server related error occurs.
+   * @throws ConfigException if a configuration related error occurs.
    */
   public LDIFImportResult importLDIF(RootContainer rootContainer)
-      throws DatabaseException, IOException, JebException
+      throws DatabaseException, IOException, JebException, DirectoryException,
+             ConfigException
   {
 
     // Create an LDIF reader. Throws an exception if the file does not exist.
     reader = new LDIFReader(ldifImportConfig);
     this.rootContainer = rootContainer;
     this.config = rootContainer.getConfiguration();
+    this.mergePassNumber = 1;
+    this.entriesProcessed = 0;
+    this.importPassSize   = config.getBackendImportPassSize();
+    if (importPassSize <= 0)
+    {
+      importPassSize = Integer.MAX_VALUE;
+    }
 
     int msgID;
     String message;
@@ -185,31 +209,15 @@
         TRACER.debugInfo(message);
       }
 
-      // Create the import contexts for each base DN.
-      DN baseDN;
-
       for (EntryContainer entryContainer : rootContainer.getEntryContainers())
       {
-        baseDN = entryContainer.getBaseDN();
+        ImportContext importContext =
+            getImportContext(entryContainer, bufferSize);
 
-        // Create an import context.
-        ImportContext importContext = new ImportContext();
-        importContext.setBufferSize(bufferSize);
-        importContext.setConfig(config);
-        importContext.setLDIFImportConfig(this.ldifImportConfig);
-        importContext.setLDIFReader(reader);
-
-        importContext.setBaseDN(baseDN);
-        importContext.setContainerName(entryContainer.getContainerName());
-        importContext.setEntryContainer(entryContainer);
-        importContext.setBufferSize(bufferSize);
-
-        // Create an entry queue.
-        LinkedBlockingQueue<Entry> queue =
-            new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
-        importContext.setQueue(queue);
-
-        importMap.put(baseDN, importContext);
+        if(importContext != null)
+        {
+          importMap.put(entryContainer.getBaseDN(), importContext);
+        }
       }
 
       // Make a note of the time we started.
@@ -232,53 +240,57 @@
         }
       }
 
+      startWorkerThreads();
       try
       {
         importedCount = 0;
-        int     passNumber = 1;
-        boolean moreData   = true;
-        while (moreData)
-        {
-          moreData = processLDIF();
-          if (moreData)
-          {
-            msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
-            message = getMessage(msgID, passNumber++);
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
-          }
-          else
-          {
-            msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
-            message = getMessage(msgID);
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
-          }
-
-
-          long mergeStartTime = System.currentTimeMillis();
-          merge();
-          long mergeEndTime = System.currentTimeMillis();
-
-          if (moreData)
-          {
-            msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
-            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
-          }
-          else
-          {
-            msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
-            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
-            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
-                     message, msgID);
-          }
-        }
+        migratedCount = 0;
+        migrateExistingEntries();
+        processLDIF();
+        migrateExcludedEntries();
       }
       finally
       {
+        merge(false);
         tempDir.delete();
+
+        for(ImportContext importContext : importMap.values())
+        {
+          DN baseDN = importContext.getBaseDN();
+          EntryContainer srcEntryContainer =
+              importContext.getSrcEntryContainer();
+          if(srcEntryContainer != null)
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugInfo("Deleteing old entry container for base DN " +
+                  "%s and renaming temp entry container", baseDN);
+            }
+            EntryContainer unregEC =
+              rootContainer.unregisterEntryContainer(baseDN);
+            //Make sure the unregistered EC for the base DN is the same as
+            //the one in the import context.
+            if(unregEC != srcEntryContainer)
+            {
+              if(debugEnabled())
+              {
+                TRACER.debugInfo("Current entry container used for base DN " +
+                    "%s is not the same as the source entry container used " +
+                    "during the migration process.", baseDN);
+              }
+              rootContainer.registerEntryContainer(baseDN, unregEC);
+              continue;
+            }
+            srcEntryContainer.exclusiveLock.lock();
+            srcEntryContainer.delete();
+            srcEntryContainer.exclusiveLock.unlock();
+            EntryContainer newEC = importContext.getEntryContainer();
+            newEC.exclusiveLock.lock();
+            newEC.setDatabasePrefix(baseDN.toNormalizedString());
+            newEC.exclusiveLock.unlock();
+            rootContainer.registerEntryContainer(baseDN, newEC);
+          }
+        }
       }
     }
     finally
@@ -296,9 +308,11 @@
     }
 
     msgID = MSGID_JEB_IMPORT_FINAL_STATUS;
-    message = getMessage(msgID, reader.getEntriesRead(), importedCount,
+    message = getMessage(msgID, reader.getEntriesRead(),
+                         importedCount - migratedCount,
                          reader.getEntriesIgnored(),
-                         reader.getEntriesRejected(), importTime/1000, rate);
+                         reader.getEntriesRejected(),
+                         migratedCount, importTime/1000, rate);
     logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
              message, msgID);
 
@@ -314,135 +328,169 @@
 
   /**
    * Merge the intermediate files to load the index databases.
+   *
+   * @param moreData <CODE>true</CODE> if this is a intermediate merge or
+   * <CODE>false</CODE> if this is a final merge.
+   * @throws DatabaseException If an error occurs in the JE database.
    */
-  public void merge()
+  private void merge(boolean moreData) throws DatabaseException
   {
-    ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
+    stopWorkerThreads();
 
-    // Create merge threads for each base DN.
-    for (ImportContext importContext : importMap.values())
+    try
     {
-      EntryContainer entryContainer = importContext.getEntryContainer();
-
-      // For each configured attribute index.
-      for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
+      if (moreData)
       {
-        int indexEntryLimit = config.getBackendIndexEntryLimit();
-        if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
+        int msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
+        String message = getMessage(msgID, mergePassNumber++);
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
+      }
+      else
+      {
+        int msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
+        String message = getMessage(msgID);
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
+      }
+
+
+      long mergeStartTime = System.currentTimeMillis();
+
+      ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
+
+      // Create merge threads for each base DN.
+      for (ImportContext importContext : importMap.values())
+      {
+        EntryContainer entryContainer = importContext.getEntryContainer();
+
+        // For each configured attribute index.
+        for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
         {
-          indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
+          int indexEntryLimit = config.getBackendIndexEntryLimit();
+          if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
+          {
+            indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
+          }
+
+          if (attrIndex.equalityIndex != null)
+          {
+            Index index = attrIndex.equalityIndex;
+            IndexMergeThread indexMergeThread =
+                new IndexMergeThread(config,
+                                     ldifImportConfig, index,
+                                     indexEntryLimit);
+            mergers.add(indexMergeThread);
+          }
+          if (attrIndex.presenceIndex != null)
+          {
+            Index index = attrIndex.presenceIndex;
+            IndexMergeThread indexMergeThread =
+                new IndexMergeThread(config,
+                                     ldifImportConfig, index,
+                                     indexEntryLimit);
+            mergers.add(indexMergeThread);
+          }
+          if (attrIndex.substringIndex != null)
+          {
+            Index index = attrIndex.substringIndex;
+            IndexMergeThread indexMergeThread =
+                new IndexMergeThread(config,
+                                     ldifImportConfig, index,
+                                     indexEntryLimit);
+            mergers.add(indexMergeThread);
+          }
+          if (attrIndex.orderingIndex != null)
+          {
+            Index index = attrIndex.orderingIndex;
+            IndexMergeThread indexMergeThread =
+                new IndexMergeThread(config,
+                                     ldifImportConfig, index,
+                                     indexEntryLimit);
+            mergers.add(indexMergeThread);
+          }
+          if (attrIndex.approximateIndex != null)
+          {
+            Index index = attrIndex.approximateIndex;
+            IndexMergeThread indexMergeThread =
+                new IndexMergeThread(config,
+                                     ldifImportConfig, index,
+                                     indexEntryLimit);
+            mergers.add(indexMergeThread);
+          }
         }
 
-        if (attrIndex.equalityIndex != null)
+        // Id2Children index.
+        Index id2Children = entryContainer.getID2Children();
+        IndexMergeThread indexMergeThread =
+            new IndexMergeThread(config,
+                                 ldifImportConfig,
+                                 id2Children,
+                                 config.getBackendIndexEntryLimit());
+        mergers.add(indexMergeThread);
+
+        // Id2Subtree index.
+        Index id2Subtree = entryContainer.getID2Subtree();
+        indexMergeThread =
+            new IndexMergeThread(config,
+                                 ldifImportConfig,
+                                 id2Subtree,
+                                 config.getBackendIndexEntryLimit());
+        mergers.add(indexMergeThread);
+      }
+
+      // Run all the merge threads.
+      for (IndexMergeThread imt : mergers)
+      {
+        imt.start();
+      }
+
+      // Wait for the threads to finish.
+      for (IndexMergeThread imt : mergers)
+      {
+        try
         {
-          Index index = attrIndex.equalityIndex;
-          IndexMergeThread indexMergeThread =
-              new IndexMergeThread(config,
-                                   ldifImportConfig, index,
-                                   indexEntryLimit);
-          mergers.add(indexMergeThread);
+          imt.join();
         }
-        if (attrIndex.presenceIndex != null)
+        catch (InterruptedException e)
         {
-          Index index = attrIndex.presenceIndex;
-          IndexMergeThread indexMergeThread =
-              new IndexMergeThread(config,
-                                   ldifImportConfig, index,
-                                   indexEntryLimit);
-          mergers.add(indexMergeThread);
-        }
-        if (attrIndex.substringIndex != null)
-        {
-          Index index = attrIndex.substringIndex;
-          IndexMergeThread indexMergeThread =
-              new IndexMergeThread(config,
-                                   ldifImportConfig, index,
-                                   indexEntryLimit);
-          mergers.add(indexMergeThread);
-        }
-        if (attrIndex.orderingIndex != null)
-        {
-          Index index = attrIndex.orderingIndex;
-          IndexMergeThread indexMergeThread =
-              new IndexMergeThread(config,
-                                   ldifImportConfig, index,
-                                   indexEntryLimit);
-          mergers.add(indexMergeThread);
-        }
-        if (attrIndex.approximateIndex != null)
-        {
-          Index index = attrIndex.approximateIndex;
-          IndexMergeThread indexMergeThread =
-              new IndexMergeThread(config,
-                                   ldifImportConfig, index,
-                                   indexEntryLimit);
-          mergers.add(indexMergeThread);
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
         }
       }
 
-      // Id2Children index.
-      Index id2Children = entryContainer.getID2Children();
-      IndexMergeThread indexMergeThread =
-          new IndexMergeThread(config,
-                               ldifImportConfig,
-                               id2Children,
-                               config.getBackendIndexEntryLimit());
-      mergers.add(indexMergeThread);
+      long mergeEndTime = System.currentTimeMillis();
 
-      // Id2Subtree index.
-      Index id2Subtree = entryContainer.getID2Subtree();
-      indexMergeThread =
-          new IndexMergeThread(config,
-                               ldifImportConfig,
-                               id2Subtree,
-                               config.getBackendIndexEntryLimit());
-      mergers.add(indexMergeThread);
-    }
-
-    // Run all the merge threads.
-    for (IndexMergeThread imt : mergers)
-    {
-      imt.start();
-    }
-
-    // Wait for the threads to finish.
-    for (IndexMergeThread imt : mergers)
-    {
-      try
+      if (moreData)
       {
-        imt.join();
+        int msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
+        String message =
+            getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
       }
-      catch (InterruptedException e)
+      else
       {
-        if (debugEnabled())
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
+        int msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
+        String message =
+            getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
+      }
+    }
+    finally
+    {
+      if(moreData)
+      {
+        startWorkerThreads();
       }
     }
   }
 
-  /**
-   * Create a set of worker threads, one set for each base DN.
-   * Read each entry from the LDIF and determine which
-   * base DN the entry belongs to. Write the dn2id database, then put the
-   * entry on the appropriate queue for the worker threads to consume.
-   * Record the entry count for each base DN when all entries have been
-   * processed.
-   *
-   * @return true if thre is more data to be read from the LDIF file (the import
-   * pass size was reached), false if the entire LDIF file has been read.
-   *
-   * @throws JebException If an error occurs in the JE backend.
-   * @throws DatabaseException If an error occurs in the JE database.
-   * @throws  IOException  If a problem occurs while opening the LDIF file for
-   *                       reading, or while reading from the LDIF file.
-   */
-  private boolean processLDIF()
-      throws JebException, DatabaseException, IOException
+  private void startWorkerThreads() throws DatabaseException
   {
-    boolean moreData = false;
-
     // Create one set of worker threads for each base DN.
     int importThreadCount = config.getBackendImportThreadCount();
     for (ImportContext ic : importMap.values())
@@ -457,122 +505,291 @@
       }
     }
 
-    try
+    // Start a timer for the progress report.
+    timer = new Timer();
+    TimerTask progressTask = new ImportJob.ProgressTask();
+    timer.scheduleAtFixedRate(progressTask, progressInterval,
+                              progressInterval);
+  }
+
+  private void stopWorkerThreads()
+  {
+    if(threads.size() > 0)
     {
-      // Create a counter to use to determine whether we've hit the import
-      // pass size.
-      int entriesProcessed = 0;
-      int importPassSize   = config.getBackendImportPassSize();
-      if (importPassSize <= 0)
+      // Wait for the queues to be drained.
+      for (ImportContext ic : importMap.values())
       {
-        importPassSize = Integer.MAX_VALUE;
-      }
-
-      // Start a timer for the progress report.
-      Timer timer = new Timer();
-      TimerTask progressTask = new ImportJob.ProgressTask();
-      timer.scheduleAtFixedRate(progressTask, progressInterval,
-                                progressInterval);
-
-      try
-      {
-        do
+        while (ic.getQueue().size() > 0)
         {
-          if(threads.size() <= 0)
-          {
-            int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
-            String msg = getMessage(msgID);
-            throw new JebException(msgID, msg);
-          }
-
           try
           {
-            // Read the next entry.
-            Entry entry = reader.readEntry();
-
-            // Check for end of file.
-            if (entry == null)
-            {
-              break;
-            }
-
-            // Route it according to base DN.
-            ImportContext importContext = getImportConfig(entry.getDN());
-
-            processEntry(importContext, entry);
-
-            entriesProcessed++;
-            if (entriesProcessed >= importPassSize)
-            {
-              moreData = true;
-              break;
-            }
-          }
-          catch (LDIFException e)
+            Thread.sleep(100);
+          } catch (Exception e)
           {
-            if (debugEnabled())
-            {
-              TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            }
+            // No action needed.
           }
-          catch (DirectoryException e)
-          {
-            if (debugEnabled())
-            {
-              TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            }
-          }
-        } while (true);
+        }
+      }
+    }
 
-        if(threads.size() > 0)
+    // Order the threads to stop.
+    for (ImportThread t : threads)
+    {
+      t.stopProcessing();
+    }
+
+    // Wait for each thread to stop.
+    for (ImportThread t : threads)
+    {
+      try
+      {
+        t.join();
+        importedCount += t.getImportedCount();
+      }
+      catch (InterruptedException ie)
+      {
+        // No action needed?
+      }
+    }
+
+    timer.cancel();
+  }
+
+  /**
+   * Create a set of worker threads, one set for each base DN.
+   * Read each entry from the LDIF and determine which
+   * base DN the entry belongs to. Write the dn2id database, then put the
+   * entry on the appropriate queue for the worker threads to consume.
+   * Record the entry count for each base DN when all entries have been
+   * processed.
+   *
+   * pass size was reached), false if the entire LDIF file has been read.
+   *
+   * @throws JebException If an error occurs in the JE backend.
+   * @throws DatabaseException If an error occurs in the JE database.
+   * @throws  IOException  If a problem occurs while opening the LDIF file for
+   *                       reading, or while reading from the LDIF file.
+   */
+  private void processLDIF()
+      throws JebException, DatabaseException, IOException
+  {
+    int msgID = MSGID_JEB_IMPORT_LDIF_START;
+    String message = getMessage(msgID);
+    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+             message, msgID);
+
+    do
+    {
+      if(threads.size() <= 0)
+      {
+        msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+        message = getMessage(msgID);
+        throw new JebException(msgID, message);
+      }
+      try
+      {
+        // Read the next entry.
+        Entry entry = reader.readEntry();
+
+        // Check for end of file.
+        if (entry == null)
         {
-          // Wait for the queues to be drained.
-          for (ImportContext ic : importMap.values())
+          msgID = MSGID_JEB_IMPORT_LDIF_END;
+          message = getMessage(msgID);
+          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                   message, msgID);
+
+          break;
+        }
+
+        // Route it according to base DN.
+        ImportContext importContext = getImportConfig(entry.getDN());
+
+        processEntry(importContext, entry);
+
+        entriesProcessed++;
+        if (entriesProcessed >= importPassSize)
+        {
+          merge(false);
+          entriesProcessed = 0;
+        }
+      }
+      catch (LDIFException e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+      }
+      catch (DirectoryException e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+      }
+    } while (true);
+  }
+
+  private void migrateExistingEntries()
+      throws JebException, DatabaseException, DirectoryException
+  {
+    for(ImportContext importContext : importMap.values())
+    {
+      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
+      if(srcEntryContainer != null &&
+          !importContext.getIncludeBranches().isEmpty())
+      {
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+        LockMode lockMode = LockMode.DEFAULT;
+        OperationStatus status;
+
+        int msgID = MSGID_JEB_IMPORT_MIGRATION_START;
+        String message = getMessage(msgID, "existing",
+                                    importContext.getBaseDN());
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
+
+        Cursor cursor =
+            srcEntryContainer.getDN2ID().openCursor(null,
+                                                   CursorConfig.READ_COMMITTED);
+        try
+        {
+          status = cursor.getFirst(key, data, lockMode);
+
+          while(status == OperationStatus.SUCCESS)
           {
-            while (ic.getQueue().size() > 0)
+            if(threads.size() <= 0)
             {
-              try
+              msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+              message = getMessage(msgID);
+              throw new JebException(msgID, message);
+            }
+
+            DN dn = DN.decode(new ASN1OctetString(key.getData()));
+            if(!importContext.getIncludeBranches().contains(dn))
+            {
+              EntryID id = new EntryID(data);
+              Entry entry = srcEntryContainer.getID2Entry().get(null, id);
+              processEntry(importContext, entry);
+
+              entriesProcessed++;
+              migratedCount++;
+              if (entriesProcessed >= importPassSize)
               {
-                Thread.sleep(100);
-              } catch (Exception e)
+                merge(true);
+                entriesProcessed = 0;
+              }
+              status = cursor.getNext(key, data, lockMode);
+            }
+            else
+            {
+              // This is the base entry for a branch that will be included
+              // in the import so we don't want to copy the branch to the new
+              // entry container.
+
+              /**
+               * Advance the cursor to next entry at the same level in the DIT
+               * skipping all the entries in this branch.
+               * Set the next starting value to a value of equal length but
+               * slightly greater than the previous DN. Since keys are compared
+               * in reverse order we must set the first byte (the comma).
+               * No possibility of overflow here.
+               */
+              byte[] begin =
+                  StaticUtils.getBytes("," + dn.toNormalizedString());
+              begin[0] = (byte) (begin[0] + 1);
+              key.setData(begin);
+              status = cursor.getSearchKeyRange(key, data, lockMode);
+            }
+          }
+        }
+        finally
+        {
+          cursor.close();
+        }
+      }
+    }
+  }
+
+  private void migrateExcludedEntries()
+      throws JebException, DatabaseException
+  {
+    for(ImportContext importContext : importMap.values())
+    {
+      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
+      if(srcEntryContainer != null &&
+          !importContext.getExcludeBranches().isEmpty())
+      {
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+        LockMode lockMode = LockMode.DEFAULT;
+        OperationStatus status;
+
+        int msgID = MSGID_JEB_IMPORT_MIGRATION_START;
+        String message = getMessage(msgID, "excluded",
+                                    importContext.getBaseDN());
+        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+                 message, msgID);
+
+        Cursor cursor =
+            srcEntryContainer.getDN2ID().openCursor(null,
+                                                   CursorConfig.READ_COMMITTED);
+        Comparator<byte[]> dn2idComparator =
+            srcEntryContainer.getDN2ID().getComparator();
+        try
+        {
+          for(DN excludedDN : importContext.getExcludeBranches())
+          {
+            byte[] suffix =
+                StaticUtils.getBytes(excludedDN.toNormalizedString());
+            key.setData(suffix);
+            status = cursor.getSearchKeyRange(key, data, lockMode);
+
+            if(status == OperationStatus.SUCCESS &&
+                Arrays.equals(key.getData(), suffix))
+            {
+              // This is the base entry for a branch that was excluded in the
+              // import so we must migrate all entries in this branch over to
+              // the new entry container.
+
+              byte[] end =
+                  StaticUtils.getBytes("," + excludedDN.toNormalizedString());
+              end[0] = (byte) (end[0] + 1);
+
+              while(status == OperationStatus.SUCCESS &&
+                  dn2idComparator.compare(key.getData(), end) < 0)
               {
-                // No action needed.
+                if(threads.size() <= 0)
+                {
+                  msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+                  message = getMessage(msgID);
+                  throw new JebException(msgID, message);
+                }
+
+                EntryID id = new EntryID(data);
+                Entry entry = srcEntryContainer.getID2Entry().get(null, id);
+                processEntry(importContext, entry);
+
+                entriesProcessed++;
+                migratedCount++;
+                if (entriesProcessed >= importPassSize)
+                {
+                  merge(true);
+                  entriesProcessed = 0;
+                }
+                status = cursor.getNext(key, data, lockMode);
               }
             }
           }
         }
-
-      }
-      finally
-      {
-        timer.cancel();
-      }
-    }
-    finally
-    {
-      // Order the threads to stop.
-      for (ImportThread t : threads)
-      {
-        t.stopProcessing();
-      }
-
-      // Wait for each thread to stop.
-      for (ImportThread t : threads)
-      {
-        try
+        finally
         {
-          t.join();
-          importedCount += t.getImportedCount();
-        }
-        catch (InterruptedException ie)
-        {
-          // No action needed?
+          cursor.close();
         }
       }
     }
-
-
-    return moreData;
   }
 
   /**
@@ -799,6 +1016,133 @@
     return importContext;
   }
 
+  private ImportContext getImportContext(EntryContainer entryContainer,
+                                         long bufferSize)
+      throws DatabaseException, JebException, ConfigException
+  {
+    DN baseDN = entryContainer.getBaseDN();
+    EntryContainer srcEntryContainer = null;
+    List<DN> includeBranches = new ArrayList<DN>();
+    List<DN> excludeBranches = new ArrayList<DN>();
+
+    if(!ldifImportConfig.appendToExistingData() &&
+        !ldifImportConfig.clearBackend())
+    {
+      for(DN dn : ldifImportConfig.getExcludeBranches())
+      {
+        if(baseDN.equals(dn))
+        {
+          // This entire base DN was explicitly excluded. Skip.
+          return null;
+        }
+        if(baseDN.isAncestorOf(dn))
+        {
+          excludeBranches.add(dn);
+        }
+      }
+
+      if(!ldifImportConfig.getIncludeBranches().isEmpty())
+      {
+        for(DN dn : ldifImportConfig.getIncludeBranches())
+        {
+          if(baseDN.isAncestorOf(dn))
+          {
+            includeBranches.add(dn);
+          }
+        }
+
+        if(includeBranches.isEmpty())
+        {
+          // There are no branches in the explicitly defined include list under
+          // this base DN. Skip this base DN alltogether.
+
+          return null;
+        }
+
+        // Remove any overlapping include branches.
+        for(DN includeDN : includeBranches)
+        {
+          boolean keep = true;
+          for(DN dn : includeBranches)
+          {
+            if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
+            {
+              keep = false;
+              break;
+            }
+          }
+          if(!keep)
+          {
+            includeBranches.remove(includeDN);
+          }
+        }
+
+        // Remvoe any exclude branches that are not are not under a include
+        // branch since they will be migrated as part of the existing entries
+        // outside of the include branches anyways.
+        for(DN excludeDN : excludeBranches)
+        {
+          boolean keep = false;
+          for(DN includeDN : includeBranches)
+          {
+            if(includeDN.isAncestorOf(excludeDN))
+            {
+              keep = true;
+              break;
+            }
+          }
+          if(!keep)
+          {
+            excludeBranches.remove(excludeDN);
+          }
+        }
+
+        if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
+            includeBranches.get(0).equals(baseDN))
+        {
+          // This entire base DN is explicitly included in the import with
+          // no exclude branches that we need to migrate. Just clear the entry
+          // container.
+          entryContainer.exclusiveLock.lock();
+          entryContainer.clear();
+          entryContainer.exclusiveLock.unlock();
+        }
+        else
+        {
+          // Create a temp entry container
+          srcEntryContainer = entryContainer;
+          entryContainer =
+              rootContainer.openEntryContainer(baseDN,
+                                               baseDN.toNormalizedString() +
+                                                   "_importTmp");
+        }
+      }
+    }
+
+    // Create an import context.
+    ImportContext importContext = new ImportContext();
+    importContext.setBufferSize(bufferSize);
+    importContext.setConfig(config);
+    importContext.setLDIFImportConfig(this.ldifImportConfig);
+    importContext.setLDIFReader(reader);
+
+    importContext.setBaseDN(baseDN);
+    importContext.setEntryContainer(entryContainer);
+    importContext.setSrcEntryContainer(srcEntryContainer);
+    importContext.setBufferSize(bufferSize);
+
+    // Create an entry queue.
+    LinkedBlockingQueue<Entry> queue =
+        new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
+    importContext.setQueue(queue);
+
+    // Set the include and exclude branches
+    importContext.setIncludeBranches(includeBranches);
+    importContext.setExcludeBranches(excludeBranches);
+
+    return importContext;
+  }
+
   /**
    * This class reports progress of the import job at fixed intervals.
    */
@@ -841,7 +1185,7 @@
      */
     public void run()
     {
-      long latestCount = reader.getEntriesRead();
+      long latestCount = reader.getEntriesRead() + migratedCount;
       long deltaCount = (latestCount - previousCount);
       long latestTime = System.currentTimeMillis();
       long deltaTime = latestTime - previousTime;
@@ -858,7 +1202,7 @@
 
       int msgID = MSGID_JEB_IMPORT_PROGRESS_REPORT;
       String message = getMessage(msgID, numRead, numIgnored, numRejected,
-                                  rate);
+                                  migratedCount, rate);
       logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                message, msgID);
 

--
Gitblit v1.10.0