mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
26.31.2007 f8ef0eed366445c5a341dbcc7882a7104c1cac1b
opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -26,10 +26,7 @@
 */
package org.opends.server.backends.jeb;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.EnvironmentStats;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.*;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.messages.JebMessages;
@@ -48,10 +45,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@@ -66,6 +60,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.messages.JebMessages.*;
import org.opends.server.admin.std.server.JEBackendCfg;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.config.ConfigException;
/**
 * Import from LDIF to a JE backend.
@@ -108,12 +104,30 @@
   */
  private int importedCount;
  /**
   * The number of entries migrated.
   */
  private int migratedCount;
  /**
   * The number of merge passes.
   */
  int mergePassNumber = 1;
  /**
   * The number of milliseconds between job progress reports.
   */
  private long progressInterval = 10000;
  /**
   * The progress report timer.
   */
  private Timer timer;
  private int entriesProcessed;
  private int importPassSize;
  /**
   * The import worker threads.
@@ -146,15 +160,25 @@
   * @throws IOException  If a problem occurs while opening the LDIF file for
   *                      reading, or while reading from the LDIF file.
   * @throws JebException If an error occurs in the JE backend.
   * @throws DirectoryException if a directory server related error occurs.
   * @throws ConfigException if a configuration related error occurs.
   */
  public LDIFImportResult importLDIF(RootContainer rootContainer)
      throws DatabaseException, IOException, JebException
      throws DatabaseException, IOException, JebException, DirectoryException,
             ConfigException
  {
    // Create an LDIF reader. Throws an exception if the file does not exist.
    reader = new LDIFReader(ldifImportConfig);
    this.rootContainer = rootContainer;
    this.config = rootContainer.getConfiguration();
    this.mergePassNumber = 1;
    this.entriesProcessed = 0;
    this.importPassSize   = config.getBackendImportPassSize();
    if (importPassSize <= 0)
    {
      importPassSize = Integer.MAX_VALUE;
    }
    int msgID;
    String message;
@@ -185,31 +209,15 @@
        TRACER.debugInfo(message);
      }
      // Create the import contexts for each base DN.
      DN baseDN;
      for (EntryContainer entryContainer : rootContainer.getEntryContainers())
      {
        baseDN = entryContainer.getBaseDN();
        ImportContext importContext =
            getImportContext(entryContainer, bufferSize);
        // Create an import context.
        ImportContext importContext = new ImportContext();
        importContext.setBufferSize(bufferSize);
        importContext.setConfig(config);
        importContext.setLDIFImportConfig(this.ldifImportConfig);
        importContext.setLDIFReader(reader);
        importContext.setBaseDN(baseDN);
        importContext.setContainerName(entryContainer.getContainerName());
        importContext.setEntryContainer(entryContainer);
        importContext.setBufferSize(bufferSize);
        // Create an entry queue.
        LinkedBlockingQueue<Entry> queue =
            new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
        importContext.setQueue(queue);
        importMap.put(baseDN, importContext);
        if(importContext != null)
        {
          importMap.put(entryContainer.getBaseDN(), importContext);
        }
      }
      // Make a note of the time we started.
@@ -232,53 +240,57 @@
        }
      }
      startWorkerThreads();
      try
      {
        importedCount = 0;
        int     passNumber = 1;
        boolean moreData   = true;
        while (moreData)
        {
          moreData = processLDIF();
          if (moreData)
          {
            msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
            message = getMessage(msgID, passNumber++);
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
          else
          {
            msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
            message = getMessage(msgID);
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
          long mergeStartTime = System.currentTimeMillis();
          merge();
          long mergeEndTime = System.currentTimeMillis();
          if (moreData)
          {
            msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
          else
          {
            msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
        }
        migratedCount = 0;
        migrateExistingEntries();
        processLDIF();
        migrateExcludedEntries();
      }
      finally
      {
        merge(false);
        tempDir.delete();
        for(ImportContext importContext : importMap.values())
        {
          DN baseDN = importContext.getBaseDN();
          EntryContainer srcEntryContainer =
              importContext.getSrcEntryContainer();
          if(srcEntryContainer != null)
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("Deleteing old entry container for base DN " +
                  "%s and renaming temp entry container", baseDN);
            }
            EntryContainer unregEC =
              rootContainer.unregisterEntryContainer(baseDN);
            //Make sure the unregistered EC for the base DN is the same as
            //the one in the import context.
            if(unregEC != srcEntryContainer)
            {
              if(debugEnabled())
              {
                TRACER.debugInfo("Current entry container used for base DN " +
                    "%s is not the same as the source entry container used " +
                    "during the migration process.", baseDN);
              }
              rootContainer.registerEntryContainer(baseDN, unregEC);
              continue;
            }
            srcEntryContainer.exclusiveLock.lock();
            srcEntryContainer.delete();
            srcEntryContainer.exclusiveLock.unlock();
            EntryContainer newEC = importContext.getEntryContainer();
            newEC.exclusiveLock.lock();
            newEC.setDatabasePrefix(baseDN.toNormalizedString());
            newEC.exclusiveLock.unlock();
            rootContainer.registerEntryContainer(baseDN, newEC);
          }
        }
      }
    }
    finally
@@ -296,9 +308,11 @@
    }
    msgID = MSGID_JEB_IMPORT_FINAL_STATUS;
    message = getMessage(msgID, reader.getEntriesRead(), importedCount,
    message = getMessage(msgID, reader.getEntriesRead(),
                         importedCount - migratedCount,
                         reader.getEntriesIgnored(),
                         reader.getEntriesRejected(), importTime/1000, rate);
                         reader.getEntriesRejected(),
                         migratedCount, importTime/1000, rate);
    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
             message, msgID);
@@ -314,135 +328,169 @@
  /**
   * Merge the intermediate files to load the index databases.
   *
   * @param moreData <CODE>true</CODE> if this is a intermediate merge or
   * <CODE>false</CODE> if this is a final merge.
   * @throws DatabaseException If an error occurs in the JE database.
   */
  public void merge()
  private void merge(boolean moreData) throws DatabaseException
  {
    ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
    stopWorkerThreads();
    // Create merge threads for each base DN.
    for (ImportContext importContext : importMap.values())
    try
    {
      EntryContainer entryContainer = importContext.getEntryContainer();
      // For each configured attribute index.
      for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
      if (moreData)
      {
        int indexEntryLimit = config.getBackendIndexEntryLimit();
        if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
        int msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
        String message = getMessage(msgID, mergePassNumber++);
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
      }
      else
      {
        int msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
        String message = getMessage(msgID);
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
      }
      long mergeStartTime = System.currentTimeMillis();
      ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
      // Create merge threads for each base DN.
      for (ImportContext importContext : importMap.values())
      {
        EntryContainer entryContainer = importContext.getEntryContainer();
        // For each configured attribute index.
        for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
        {
          indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
          int indexEntryLimit = config.getBackendIndexEntryLimit();
          if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
          {
            indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
          }
          if (attrIndex.equalityIndex != null)
          {
            Index index = attrIndex.equalityIndex;
            IndexMergeThread indexMergeThread =
                new IndexMergeThread(config,
                                     ldifImportConfig, index,
                                     indexEntryLimit);
            mergers.add(indexMergeThread);
          }
          if (attrIndex.presenceIndex != null)
          {
            Index index = attrIndex.presenceIndex;
            IndexMergeThread indexMergeThread =
                new IndexMergeThread(config,
                                     ldifImportConfig, index,
                                     indexEntryLimit);
            mergers.add(indexMergeThread);
          }
          if (attrIndex.substringIndex != null)
          {
            Index index = attrIndex.substringIndex;
            IndexMergeThread indexMergeThread =
                new IndexMergeThread(config,
                                     ldifImportConfig, index,
                                     indexEntryLimit);
            mergers.add(indexMergeThread);
          }
          if (attrIndex.orderingIndex != null)
          {
            Index index = attrIndex.orderingIndex;
            IndexMergeThread indexMergeThread =
                new IndexMergeThread(config,
                                     ldifImportConfig, index,
                                     indexEntryLimit);
            mergers.add(indexMergeThread);
          }
          if (attrIndex.approximateIndex != null)
          {
            Index index = attrIndex.approximateIndex;
            IndexMergeThread indexMergeThread =
                new IndexMergeThread(config,
                                     ldifImportConfig, index,
                                     indexEntryLimit);
            mergers.add(indexMergeThread);
          }
        }
        if (attrIndex.equalityIndex != null)
        // Id2Children index.
        Index id2Children = entryContainer.getID2Children();
        IndexMergeThread indexMergeThread =
            new IndexMergeThread(config,
                                 ldifImportConfig,
                                 id2Children,
                                 config.getBackendIndexEntryLimit());
        mergers.add(indexMergeThread);
        // Id2Subtree index.
        Index id2Subtree = entryContainer.getID2Subtree();
        indexMergeThread =
            new IndexMergeThread(config,
                                 ldifImportConfig,
                                 id2Subtree,
                                 config.getBackendIndexEntryLimit());
        mergers.add(indexMergeThread);
      }
      // Run all the merge threads.
      for (IndexMergeThread imt : mergers)
      {
        imt.start();
      }
      // Wait for the threads to finish.
      for (IndexMergeThread imt : mergers)
      {
        try
        {
          Index index = attrIndex.equalityIndex;
          IndexMergeThread indexMergeThread =
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
          imt.join();
        }
        if (attrIndex.presenceIndex != null)
        catch (InterruptedException e)
        {
          Index index = attrIndex.presenceIndex;
          IndexMergeThread indexMergeThread =
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (attrIndex.substringIndex != null)
        {
          Index index = attrIndex.substringIndex;
          IndexMergeThread indexMergeThread =
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (attrIndex.orderingIndex != null)
        {
          Index index = attrIndex.orderingIndex;
          IndexMergeThread indexMergeThread =
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (attrIndex.approximateIndex != null)
        {
          Index index = attrIndex.approximateIndex;
          IndexMergeThread indexMergeThread =
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
      // Id2Children index.
      Index id2Children = entryContainer.getID2Children();
      IndexMergeThread indexMergeThread =
          new IndexMergeThread(config,
                               ldifImportConfig,
                               id2Children,
                               config.getBackendIndexEntryLimit());
      mergers.add(indexMergeThread);
      long mergeEndTime = System.currentTimeMillis();
      // Id2Subtree index.
      Index id2Subtree = entryContainer.getID2Subtree();
      indexMergeThread =
          new IndexMergeThread(config,
                               ldifImportConfig,
                               id2Subtree,
                               config.getBackendIndexEntryLimit());
      mergers.add(indexMergeThread);
    }
    // Run all the merge threads.
    for (IndexMergeThread imt : mergers)
    {
      imt.start();
    }
    // Wait for the threads to finish.
    for (IndexMergeThread imt : mergers)
    {
      try
      if (moreData)
      {
        imt.join();
        int msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
        String message =
            getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
      }
      catch (InterruptedException e)
      else
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        int msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
        String message =
            getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
      }
    }
    finally
    {
      if(moreData)
      {
        startWorkerThreads();
      }
    }
  }
  /**
   * Create a set of worker threads, one set for each base DN.
   * Read each entry from the LDIF and determine which
   * base DN the entry belongs to. Write the dn2id database, then put the
   * entry on the appropriate queue for the worker threads to consume.
   * Record the entry count for each base DN when all entries have been
   * processed.
   *
   * @return true if thre is more data to be read from the LDIF file (the import
   * pass size was reached), false if the entire LDIF file has been read.
   *
   * @throws JebException If an error occurs in the JE backend.
   * @throws DatabaseException If an error occurs in the JE database.
   * @throws  IOException  If a problem occurs while opening the LDIF file for
   *                       reading, or while reading from the LDIF file.
   */
  private boolean processLDIF()
      throws JebException, DatabaseException, IOException
  private void startWorkerThreads() throws DatabaseException
  {
    boolean moreData = false;
    // Create one set of worker threads for each base DN.
    int importThreadCount = config.getBackendImportThreadCount();
    for (ImportContext ic : importMap.values())
@@ -457,122 +505,291 @@
      }
    }
    try
    // Start a timer for the progress report.
    timer = new Timer();
    TimerTask progressTask = new ImportJob.ProgressTask();
    timer.scheduleAtFixedRate(progressTask, progressInterval,
                              progressInterval);
  }
  private void stopWorkerThreads()
  {
    if(threads.size() > 0)
    {
      // Create a counter to use to determine whether we've hit the import
      // pass size.
      int entriesProcessed = 0;
      int importPassSize   = config.getBackendImportPassSize();
      if (importPassSize <= 0)
      // Wait for the queues to be drained.
      for (ImportContext ic : importMap.values())
      {
        importPassSize = Integer.MAX_VALUE;
      }
      // Start a timer for the progress report.
      Timer timer = new Timer();
      TimerTask progressTask = new ImportJob.ProgressTask();
      timer.scheduleAtFixedRate(progressTask, progressInterval,
                                progressInterval);
      try
      {
        do
        while (ic.getQueue().size() > 0)
        {
          if(threads.size() <= 0)
          {
            int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
            String msg = getMessage(msgID);
            throw new JebException(msgID, msg);
          }
          try
          {
            // Read the next entry.
            Entry entry = reader.readEntry();
            // Check for end of file.
            if (entry == null)
            {
              break;
            }
            // Route it according to base DN.
            ImportContext importContext = getImportConfig(entry.getDN());
            processEntry(importContext, entry);
            entriesProcessed++;
            if (entriesProcessed >= importPassSize)
            {
              moreData = true;
              break;
            }
          }
          catch (LDIFException e)
            Thread.sleep(100);
          } catch (Exception e)
          {
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
            // No action needed.
          }
          catch (DirectoryException e)
          {
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
          }
        } while (true);
        }
      }
    }
        if(threads.size() > 0)
    // Order the threads to stop.
    for (ImportThread t : threads)
    {
      t.stopProcessing();
    }
    // Wait for each thread to stop.
    for (ImportThread t : threads)
    {
      try
      {
        t.join();
        importedCount += t.getImportedCount();
      }
      catch (InterruptedException ie)
      {
        // No action needed?
      }
    }
    timer.cancel();
  }
  /**
   * Create a set of worker threads, one set for each base DN.
   * Read each entry from the LDIF and determine which
   * base DN the entry belongs to. Write the dn2id database, then put the
   * entry on the appropriate queue for the worker threads to consume.
   * Record the entry count for each base DN when all entries have been
   * processed.
   *
   * pass size was reached), false if the entire LDIF file has been read.
   *
   * @throws JebException If an error occurs in the JE backend.
   * @throws DatabaseException If an error occurs in the JE database.
   * @throws  IOException  If a problem occurs while opening the LDIF file for
   *                       reading, or while reading from the LDIF file.
   */
  private void processLDIF()
      throws JebException, DatabaseException, IOException
  {
    int msgID = MSGID_JEB_IMPORT_LDIF_START;
    String message = getMessage(msgID);
    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
             message, msgID);
    do
    {
      if(threads.size() <= 0)
      {
        msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
        message = getMessage(msgID);
        throw new JebException(msgID, message);
      }
      try
      {
        // Read the next entry.
        Entry entry = reader.readEntry();
        // Check for end of file.
        if (entry == null)
        {
          // Wait for the queues to be drained.
          for (ImportContext ic : importMap.values())
          msgID = MSGID_JEB_IMPORT_LDIF_END;
          message = getMessage(msgID);
          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                   message, msgID);
          break;
        }
        // Route it according to base DN.
        ImportContext importContext = getImportConfig(entry.getDN());
        processEntry(importContext, entry);
        entriesProcessed++;
        if (entriesProcessed >= importPassSize)
        {
          merge(false);
          entriesProcessed = 0;
        }
      }
      catch (LDIFException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
      catch (DirectoryException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    } while (true);
  }
  private void migrateExistingEntries()
      throws JebException, DatabaseException, DirectoryException
  {
    for(ImportContext importContext : importMap.values())
    {
      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
      if(srcEntryContainer != null &&
          !importContext.getIncludeBranches().isEmpty())
      {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode lockMode = LockMode.DEFAULT;
        OperationStatus status;
        int msgID = MSGID_JEB_IMPORT_MIGRATION_START;
        String message = getMessage(msgID, "existing",
                                    importContext.getBaseDN());
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
        Cursor cursor =
            srcEntryContainer.getDN2ID().openCursor(null,
                                                   CursorConfig.READ_COMMITTED);
        try
        {
          status = cursor.getFirst(key, data, lockMode);
          while(status == OperationStatus.SUCCESS)
          {
            while (ic.getQueue().size() > 0)
            if(threads.size() <= 0)
            {
              try
              msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
              message = getMessage(msgID);
              throw new JebException(msgID, message);
            }
            DN dn = DN.decode(new ASN1OctetString(key.getData()));
            if(!importContext.getIncludeBranches().contains(dn))
            {
              EntryID id = new EntryID(data);
              Entry entry = srcEntryContainer.getID2Entry().get(null, id);
              processEntry(importContext, entry);
              entriesProcessed++;
              migratedCount++;
              if (entriesProcessed >= importPassSize)
              {
                Thread.sleep(100);
              } catch (Exception e)
                merge(true);
                entriesProcessed = 0;
              }
              status = cursor.getNext(key, data, lockMode);
            }
            else
            {
              // This is the base entry for a branch that will be included
              // in the import so we don't want to copy the branch to the new
              // entry container.
              /**
               * Advance the cursor to next entry at the same level in the DIT
               * skipping all the entries in this branch.
               * Set the next starting value to a value of equal length but
               * slightly greater than the previous DN. Since keys are compared
               * in reverse order we must set the first byte (the comma).
               * No possibility of overflow here.
               */
              byte[] begin =
                  StaticUtils.getBytes("," + dn.toNormalizedString());
              begin[0] = (byte) (begin[0] + 1);
              key.setData(begin);
              status = cursor.getSearchKeyRange(key, data, lockMode);
            }
          }
        }
        finally
        {
          cursor.close();
        }
      }
    }
  }
  private void migrateExcludedEntries()
      throws JebException, DatabaseException
  {
    for(ImportContext importContext : importMap.values())
    {
      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
      if(srcEntryContainer != null &&
          !importContext.getExcludeBranches().isEmpty())
      {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode lockMode = LockMode.DEFAULT;
        OperationStatus status;
        int msgID = MSGID_JEB_IMPORT_MIGRATION_START;
        String message = getMessage(msgID, "excluded",
                                    importContext.getBaseDN());
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
        Cursor cursor =
            srcEntryContainer.getDN2ID().openCursor(null,
                                                   CursorConfig.READ_COMMITTED);
        Comparator<byte[]> dn2idComparator =
            srcEntryContainer.getDN2ID().getComparator();
        try
        {
          for(DN excludedDN : importContext.getExcludeBranches())
          {
            byte[] suffix =
                StaticUtils.getBytes(excludedDN.toNormalizedString());
            key.setData(suffix);
            status = cursor.getSearchKeyRange(key, data, lockMode);
            if(status == OperationStatus.SUCCESS &&
                Arrays.equals(key.getData(), suffix))
            {
              // This is the base entry for a branch that was excluded in the
              // import so we must migrate all entries in this branch over to
              // the new entry container.
              byte[] end =
                  StaticUtils.getBytes("," + excludedDN.toNormalizedString());
              end[0] = (byte) (end[0] + 1);
              while(status == OperationStatus.SUCCESS &&
                  dn2idComparator.compare(key.getData(), end) < 0)
              {
                // No action needed.
                if(threads.size() <= 0)
                {
                  msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
                  message = getMessage(msgID);
                  throw new JebException(msgID, message);
                }
                EntryID id = new EntryID(data);
                Entry entry = srcEntryContainer.getID2Entry().get(null, id);
                processEntry(importContext, entry);
                entriesProcessed++;
                migratedCount++;
                if (entriesProcessed >= importPassSize)
                {
                  merge(true);
                  entriesProcessed = 0;
                }
                status = cursor.getNext(key, data, lockMode);
              }
            }
          }
        }
      }
      finally
      {
        timer.cancel();
      }
    }
    finally
    {
      // Order the threads to stop.
      for (ImportThread t : threads)
      {
        t.stopProcessing();
      }
      // Wait for each thread to stop.
      for (ImportThread t : threads)
      {
        try
        finally
        {
          t.join();
          importedCount += t.getImportedCount();
        }
        catch (InterruptedException ie)
        {
          // No action needed?
          cursor.close();
        }
      }
    }
    return moreData;
  }
  /**
@@ -799,6 +1016,133 @@
    return importContext;
  }
  private ImportContext getImportContext(EntryContainer entryContainer,
                                         long bufferSize)
      throws DatabaseException, JebException, ConfigException
  {
    DN baseDN = entryContainer.getBaseDN();
    EntryContainer srcEntryContainer = null;
    List<DN> includeBranches = new ArrayList<DN>();
    List<DN> excludeBranches = new ArrayList<DN>();
    if(!ldifImportConfig.appendToExistingData() &&
        !ldifImportConfig.clearBackend())
    {
      for(DN dn : ldifImportConfig.getExcludeBranches())
      {
        if(baseDN.equals(dn))
        {
          // This entire base DN was explicitly excluded. Skip.
          return null;
        }
        if(baseDN.isAncestorOf(dn))
        {
          excludeBranches.add(dn);
        }
      }
      if(!ldifImportConfig.getIncludeBranches().isEmpty())
      {
        for(DN dn : ldifImportConfig.getIncludeBranches())
        {
          if(baseDN.isAncestorOf(dn))
          {
            includeBranches.add(dn);
          }
        }
        if(includeBranches.isEmpty())
        {
          // There are no branches in the explicitly defined include list under
          // this base DN. Skip this base DN alltogether.
          return null;
        }
        // Remove any overlapping include branches.
        for(DN includeDN : includeBranches)
        {
          boolean keep = true;
          for(DN dn : includeBranches)
          {
            if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
            {
              keep = false;
              break;
            }
          }
          if(!keep)
          {
            includeBranches.remove(includeDN);
          }
        }
        // Remvoe any exclude branches that are not are not under a include
        // branch since they will be migrated as part of the existing entries
        // outside of the include branches anyways.
        for(DN excludeDN : excludeBranches)
        {
          boolean keep = false;
          for(DN includeDN : includeBranches)
          {
            if(includeDN.isAncestorOf(excludeDN))
            {
              keep = true;
              break;
            }
          }
          if(!keep)
          {
            excludeBranches.remove(excludeDN);
          }
        }
        if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
            includeBranches.get(0).equals(baseDN))
        {
          // This entire base DN is explicitly included in the import with
          // no exclude branches that we need to migrate. Just clear the entry
          // container.
          entryContainer.exclusiveLock.lock();
          entryContainer.clear();
          entryContainer.exclusiveLock.unlock();
        }
        else
        {
          // Create a temp entry container
          srcEntryContainer = entryContainer;
          entryContainer =
              rootContainer.openEntryContainer(baseDN,
                                               baseDN.toNormalizedString() +
                                                   "_importTmp");
        }
      }
    }
    // Create an import context.
    ImportContext importContext = new ImportContext();
    importContext.setBufferSize(bufferSize);
    importContext.setConfig(config);
    importContext.setLDIFImportConfig(this.ldifImportConfig);
    importContext.setLDIFReader(reader);
    importContext.setBaseDN(baseDN);
    importContext.setEntryContainer(entryContainer);
    importContext.setSrcEntryContainer(srcEntryContainer);
    importContext.setBufferSize(bufferSize);
    // Create an entry queue.
    LinkedBlockingQueue<Entry> queue =
        new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
    importContext.setQueue(queue);
    // Set the include and exclude branches
    importContext.setIncludeBranches(includeBranches);
    importContext.setExcludeBranches(excludeBranches);
    return importContext;
  }
  /**
   * This class reports progress of the import job at fixed intervals.
   */
@@ -841,7 +1185,7 @@
     */
    public void run()
    {
      long latestCount = reader.getEntriesRead();
      long latestCount = reader.getEntriesRead() + migratedCount;
      long deltaCount = (latestCount - previousCount);
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
@@ -858,7 +1202,7 @@
      int msgID = MSGID_JEB_IMPORT_PROGRESS_REPORT;
      String message = getMessage(msgID, numRead, numIgnored, numRejected,
                                  rate);
                                  migratedCount, rate);
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
               message, msgID);