From 328ec50e683c622586d30aeb9dee55bebdebfe0c Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Fri, 24 Jul 2009 22:32:43 +0000
Subject: [PATCH] Commit of new import code.

---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 2792 +++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 1,869 insertions(+), 923 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 0bfdb50..e428796 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -27,954 +27,1756 @@
 
 package org.opends.server.backends.jeb.importLDIF;
 
-import org.opends.server.types.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.ErrorLogger.logError;
+
+import static org.opends.messages.JebMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.DynamicConstants.*;
+import static org.opends.server.util.ServerConstants.*;
+
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.opends.server.util.StaticUtils.getFileForPath;
+import org.opends.messages.Message;
+import org.opends.messages.Category;
+import org.opends.messages.Severity;
 import org.opends.server.admin.std.server.LocalDBBackendCfg;
-import org.opends.server.util.LDIFReader;
-import org.opends.server.util.StaticUtils;
-import org.opends.server.util.LDIFException;
-import org.opends.server.util.RuntimeInformation;
-import static org.opends.server.util.DynamicConstants.BUILD_ID;
-import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
+import org.opends.server.backends.jeb.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.backends.jeb.*;
-import org.opends.messages.Message;
-import org.opends.messages.JebMessages;
-import static org.opends.messages.JebMessages.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.*;
-import java.io.IOException;
-
+import org.opends.server.types.*;
+import org.opends.server.util.*;
 import com.sleepycat.je.*;
 
+
 /**
  * Performs a LDIF import.
  */
+public class Importer
+{
+  private final int DRAIN_TO = 3;
+  private final int TIMER_INTERVAL = 10000;
+  private final int MB =  (1024 * 1024);
+  private final int LDIF_READER_BUF_SIZE = 2 * MB;
+  private final int MIN_IMPORT_MEM_REQUIRED = 16 * MB;
+  private final int MAX_BUFFER_SIZE = 48 * MB;
+  private final int MIN_BUFFER_SIZE = 1024 * 100;
+  private final int MIN_READ_AHEAD_CACHE_SIZE = 4096;
+  private final int MAX_DB_CACHE_SIZE = 128 * MB;
+  private final int MIN_DB_CACHE_SIZE = 16 * MB;
+  private final int MAX_DB_LOG_BUF_BYTES = 100 * MB;
+  private final int MEM_PCT_PHASE_1 = 60;
+  private final int MEM_PCT_PHASE_2 = 50;
 
-public class Importer implements Thread.UncaughtExceptionHandler {
+  private final String DIRECT_PROPERTY = "import.directphase2";
 
+  private final AtomicInteger bufferCount = new AtomicInteger(0);
+  private final File tempDir;
+  private final int indexCount, threadCount;
+  private final boolean dn2idPhase2;
+  private final LDIFImportConfig config;
+  private final ByteBuffer directBuffer;
 
-  /**
-   * The tracer object for the debug logger.
-   */
-  private static final DebugTracer TRACER = getTracer();
-
-  /**
-   * The JE backend configuration.
-   */
-  private LocalDBBackendCfg config;
-
-  /**
-   * The root container used for this import job.
-   */
   private RootContainer rootContainer;
-
-  /**
-   * The LDIF import configuration.
-   */
-  private LDIFImportConfig ldifImportConfig;
-
-  /**
-   * The LDIF reader.
-   */
   private LDIFReader reader;
-
-  /**
-   * Map of base DNs to their import context.
-   */
-  private LinkedHashMap<DN, DNContext> importMap =
-      new LinkedHashMap<DN, DNContext>();
+  private int bufferSize;
+  private long dbCacheSize = 0, dbLogBufSize = 0;
 
 
-  /**
-    * The number of entries migrated.
-    */
-   private int migratedCount;
+  //The executor service used for the sort tasks.
+  private ExecutorService sortService;
 
-  /**
-   * The number of entries imported.
-   */
-  private int importedCount;
+  //The executor service used for the index processing tasks.
+  private ExecutorService indexProcessService;
 
-  /**
-   * The number of milliseconds between job progress reports.
-   */
-  private long progressInterval = 10000;
+  //Queue of free index buffers -- used to re-cycle index buffers;
+  private final BlockingQueue<IndexBuffer> freeBufQue =
+          new LinkedBlockingQueue<IndexBuffer>();
 
-  /**
-   * The progress report timer.
-   */
-  private Timer timer;
+  //Map of DB containers to que of index buffers.  Used to allocate sorted
+  //index buffers to a index writer thread.
+  private final
+  Map<DatabaseContainer, BlockingQueue<IndexBuffer>> containerQueMap =
+          new LinkedHashMap<DatabaseContainer, BlockingQueue<IndexBuffer>>();
 
-  //Thread array.
-  private CopyOnWriteArrayList<WorkThread> threads;
+  //Map of DB containers to index managers. Used to start phase 2.
+  private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap =
+          new LinkedHashMap<DatabaseContainer, IndexManager>();
 
-  //Progress task.
-  private ProgressTask pTask;
+  //Futures used to indicate when the index file writers are done flushing
+  //their work queues and have exited. End of phase one.
+  private final List<Future<?>> indexWriterFutures;
 
-  //Number of entries import before checking if cleaning is needed after
-  //eviction has been detected.
-  private static final int entryCleanInterval = 250000;
+  //List of index file writer tasks. Used to signal stopIndexWriterTasks to the
+  //index file writer tasks when the LDIF file has been done.
+  private final List<IndexFileWriterTask> indexWriterList;
 
-  //Minimum buffer amount to give to a buffer manager.
-  private static final long minBuffer = 1024 * 1024;
-
-  //Total available memory for the buffer managers.
-  private long totalAvailBufferMemory = 0;
-
-  //Memory size to be used for the DB cache in string format.
-  private String dbCacheSizeStr;
-
-  //Used to do an initial clean after eviction has been detected.
-  private boolean firstClean=false;
-
-  //A thread threw an Runtime exception stop the import.
-  private boolean unCaughtExceptionThrown = false;
-
-  //Set to true if substring indexes are defined.
-  private boolean hasSubIndexes = false;
-
-  //Work thread 0, used to add the first 20 or so entries single threaded.
-  private WorkThread workThread0;
-
-  //Counter for thread 0;
-  private int worker0Proc=0;
-
-  //Max thread 0 adds.
-  private static final int maxWorker0 = 20;
+  //Map of DNs to Suffix objects. Placeholder for when multiple suffixes are
+  //supported.
+  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
 
   /**
    * Create a new import job with the specified ldif import config.
    *
-   * @param ldifImportConfig The LDIF import config.
-   * @param hasSubIndexes <CODE>True</CODE> If substring indexes are defined.
+   * @param config The LDIF import config.
+   * @param cfg The local DB backend config.
+   * @throws IOException  If a problem occurs while opening the LDIF file for
+   *                      reading.
    */
-  public Importer(LDIFImportConfig ldifImportConfig, boolean hasSubIndexes)
+  public Importer(LDIFImportConfig config,
+                  LocalDBBackendCfg cfg )
+          throws IOException
   {
-    this.ldifImportConfig = ldifImportConfig;
-    this.threads = new CopyOnWriteArrayList<WorkThread>();
-    this.hasSubIndexes = hasSubIndexes;
-    calcMemoryLimits();
+    this.config = config;
+    threadCount = cfg.getImportThreadCount();
+    indexCount = cfg.listLocalDBIndexes().length + 2;
+    indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount);
+    indexWriterFutures = new CopyOnWriteArrayList<Future<?>>();
+    File parentDir;
+    if(config.getTmpDirectory() == null)
+    {
+      parentDir = getFileForPath("import-tmp");
+    }
+    else
+    {
+       parentDir = getFileForPath(config.getTmpDirectory());
+    }
+    tempDir = new File(parentDir, cfg.getBackendId());
+    if(!tempDir.exists() && !tempDir.mkdirs())
+    {
+      Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(
+              String.valueOf(tempDir));
+      throw new IOException(msg.toString());
+    }
+    if (tempDir.listFiles() != null)
+    {
+      for (File f : tempDir.listFiles())
+      {
+        f.delete();
+      }
+    }
+    dn2idPhase2 = config.getDNCheckPhase2();
+    String propString = System.getProperty(DIRECT_PROPERTY);
+    if(propString != null)
+    {
+      int directSize = Integer.valueOf(propString);
+      directBuffer = ByteBuffer.allocateDirect(directSize);
+    }
+    else
+    {
+     directBuffer = null;
+    }
+  }
+
+  private void getBufferSizes(long availMem, int buffers)
+  {
+    long mem = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_BUF_BYTES);
+    bufferSize = (int) (mem/buffers);
+    if(bufferSize >= MIN_BUFFER_SIZE)
+    {
+      dbCacheSize =  MAX_DB_CACHE_SIZE;
+      dbLogBufSize = MAX_DB_LOG_BUF_BYTES;
+      if(bufferSize > MAX_BUFFER_SIZE)
+      {
+        bufferSize = MAX_BUFFER_SIZE;
+      }
+    }
+    else
+    {
+      mem = availMem - MIN_DB_CACHE_SIZE - (MIN_DB_CACHE_SIZE * 7) / 100;
+      bufferSize = (int) (mem/buffers);
+      dbCacheSize =  MIN_DB_CACHE_SIZE;
+      if(bufferSize < MIN_BUFFER_SIZE)
+      {
+        System.out.println("Log size less than default -- give it a try");
+        bufferSize = MIN_BUFFER_SIZE;
+      }
+      else
+      {
+        long constrainedMem = mem - (buffers * MIN_BUFFER_SIZE);
+        bufferSize = (int) ((buffers * MIN_BUFFER_SIZE) +
+                            (constrainedMem * 50/100));
+        bufferSize /= buffers;
+        dbCacheSize = MIN_DB_CACHE_SIZE + (constrainedMem * 50/100);
+      }
+    }
+  }
+
+
+  /**
+   * Return the suffix instance in the specified map that matches the specified
+   * DN.
+   *
+   * @param dn The DN to search for.
+   * @param map The map to search.
+   * @return The suffix instance that matches the DN, or null if no match is
+   *         found.
+   */
+  public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
+  {
+    Suffix suffix = null;
+    DN nodeDN = dn;
+
+    while (suffix == null && nodeDN != null) {
+      suffix = map.get(nodeDN);
+      if (suffix == null)
+      {
+        nodeDN = nodeDN.getParentDNInSuffix();
+      }
+    }
+    return suffix;
   }
 
   /**
-   * Start the worker threads.
+   * Calculate buffer sizes and initialize JEB properties based on memory.
    *
-   * @throws DatabaseException If a DB problem occurs.
+   * @param envConfig The environment config to use in the calculations.
+   *
+   * @throws InitializationException If a problem occurs during calculation.
    */
-  private void startWorkerThreads()
-          throws DatabaseException {
-
-    int importThreadCount = config.getImportThreadCount();
-    //Figure out how much buffer memory to give to each context.
-    int contextCount = importMap.size();
-    long memoryPerContext = totalAvailBufferMemory / contextCount;
-    //Below min, use the min value.
-    if(memoryPerContext < minBuffer) {
-      Message msg =
-            NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
-                                                             minBuffer);
+  public void init(EnvironmentConfig envConfig)
+          throws InitializationException
+  {
+    Message msg;
+    Runtime runtime = Runtime.getRuntime();
+    long freeMemory = runtime.freeMemory();
+    long availMemImport = (freeMemory * MEM_PCT_PHASE_1) / 100;
+    int phaseOneBuffers = 2 * (indexCount * threadCount);
+    msg = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availMemImport, phaseOneBuffers);
+    logError(msg);
+    if (availMemImport < MIN_IMPORT_MEM_REQUIRED)
+    {
+      msg = ERR_IMPORT_LDIF_LACK_MEM.get(16);
+      throw new InitializationException(msg);
+    }
+    getBufferSizes(availMemImport, phaseOneBuffers);
+    envConfig.setConfigParam("je.maxMemory", Long.toString(dbCacheSize));
+    msg = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize);
+    logError(msg);
+    if(dbLogBufSize != 0)
+    {
+      envConfig.setConfigParam("je.log.totalBufferBytes",
+              Long.toString(dbLogBufSize));
+      msg = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(dbLogBufSize);
       logError(msg);
-      memoryPerContext = minBuffer;
     }
-    // Create one set of worker threads/buffer managers for each base DN.
-    for (DNContext context : importMap.values()) {
-      BufferManager bufferManager =
-                        new BufferManager(memoryPerContext);
-      context.setBufferManager(bufferManager);
-      for (int i = 0; i < importThreadCount; i++) {
-        WorkThread t = new WorkThread(context.getWorkQueue(), i,
-                bufferManager, rootContainer, importMap);
-        t.setUncaughtExceptionHandler(this);
-        threads.add(t);
-        if(i == 0) {
-          workThread0 = t;
-        }
-        t.start();
-      }
-    }
-    // Start a timer for the progress report.
-    timer = new Timer();
-    TimerTask progressTask = new ProgressTask();
-    //Used to get at extra functionality such as eviction detected.
-    pTask = (ProgressTask) progressTask;
-    timer.scheduleAtFixedRate(progressTask, progressInterval,
-                              progressInterval);
-
+    return;
   }
 
 
+  private void initIndexBuffers(int threadCount)
+  {
+    int bufferCount = 2 * (indexCount * threadCount);
+    for(int i = 0; i < bufferCount; i++)
+    {
+      IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize);
+      freeBufQue.add(b);
+    }
+  }
+
+
+
+  private void initSuffixes()
+          throws ConfigException, InitializationException
+  {
+    Iterator<EntryContainer> i = rootContainer.getEntryContainers().iterator();
+    EntryContainer ec = i.next();
+    Suffix suffix = Suffix.createSuffixContext(ec, config, rootContainer);
+    dnSuffixMap.put(ec.getBaseDN(), suffix);
+  }
+
+
+
   /**
    * Import a ldif using the specified root container.
    *
-   * @param rootContainer  The root container.
+   * @param rootContainer The root container to use during the import.
+   *
    * @return A LDIF result.
-   * @throws DatabaseException  If a DB error occurs.
-   * @throws IOException If a IO error occurs.
-   * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
-   * @throws DirectoryException If a directory error occurs.
-   * @throws ConfigException If a configuration has an error.
+   * @throws ConfigException If the import failed because of an configuration
+   *                         error.
+   * @throws IOException If the import failed because of an IO error.
+   * @throws InitializationException If the import failed because of an
+   *               initialization error.
+   * @throws JebException If the import failed due to a database error.
+   * @throws InterruptedException If the import failed due to an interrupted
+   *                              error.
+   * @throws ExecutionException If the import failed due to an execution error.
    */
-  public LDIFImportResult processImport(RootContainer rootContainer)
-      throws DatabaseException, IOException, JebException, DirectoryException,
-            ConfigException {
-
-    // Create an LDIF reader. Throws an exception if the file does not exist.
-    reader = new LDIFReader(ldifImportConfig);
+  public LDIFImportResult
+  processImport(RootContainer rootContainer) throws ConfigException,
+          InitializationException, IOException, JebException,
+          InterruptedException, ExecutionException
+  {
     this.rootContainer = rootContainer;
-    this.config = rootContainer.getConfiguration();
-
-    Message message;
-    long startTime;
-    try {
-      int importThreadCount = config.getImportThreadCount();
-      message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
-                                                     BUILD_ID, REVISION_NUMBER);
-      logError(message);
-      message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
-      logError(message);
-      RuntimeInformation.logInfo();
-      for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
-        DNContext DNContext =  getImportContext(entryContainer);
-        if(DNContext != null) {
-          importMap.put(entryContainer.getBaseDN(), DNContext);
-        }
-      }
-      // Make a note of the time we started.
-      startTime = System.currentTimeMillis();
-      startWorkerThreads();
-      try {
-        importedCount = 0;
-        migratedCount = 0;
-        migrateExistingEntries();
-        processLDIF();
-        migrateExcludedEntries();
-      } finally {
-        if(!unCaughtExceptionThrown) {
-          cleanUp();
-          switchContainers();
-        }
-      }
-    }
-    finally {
-      reader.close();
-    }
-    importProlog(startTime);
-    return new LDIFImportResult(reader.getEntriesRead(),
-                                reader.getEntriesRejected(),
-                                reader.getEntriesIgnored());
-  }
-
-  /**
-   * Switch containers if the migrated entries were written to the temporary
-   * container.
-   *
-   * @throws DatabaseException If a DB problem occurs.
-   * @throws JebException If a JEB problem occurs.
-   */
-  private void switchContainers() throws DatabaseException, JebException {
-
-    for(DNContext importContext : importMap.values()) {
-      DN baseDN = importContext.getBaseDN();
-      EntryContainer srcEntryContainer =
-              importContext.getSrcEntryContainer();
-      if(srcEntryContainer != null) {
-        if (debugEnabled()) {
-          TRACER.debugInfo("Deleteing old entry container for base DN " +
-                  "%s and renaming temp entry container", baseDN);
-        }
-        EntryContainer unregEC =
-                rootContainer.unregisterEntryContainer(baseDN);
-        //Make sure the unregistered EC for the base DN is the same as
-        //the one in the import context.
-        if(unregEC != srcEntryContainer) {
-          if(debugEnabled()) {
-            TRACER.debugInfo("Current entry container used for base DN " +
-                    "%s is not the same as the source entry container used " +
-                    "during the migration process.", baseDN);
-          }
-          rootContainer.registerEntryContainer(baseDN, unregEC);
-          continue;
-        }
-        srcEntryContainer.lock();
-        srcEntryContainer.close();
-        srcEntryContainer.delete();
-        srcEntryContainer.unlock();
-        EntryContainer newEC = importContext.getEntryContainer();
-        newEC.lock();
-        newEC.setDatabasePrefix(baseDN.toNormalizedString());
-        newEC.unlock();
-        rootContainer.registerEntryContainer(baseDN, newEC);
-      }
-    }
-  }
-
-  /**
-   * Create and log messages at the end of the successful import.
-   *
-   * @param startTime The time the import started.
-   */
-  private void importProlog(long startTime) {
-    Message message;
+    this.reader = new LDIFReader(config, rootContainer, LDIF_READER_BUF_SIZE);
+    Message message =
+            NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
+                    BUILD_ID, REVISION_NUMBER);
+    logError(message);
+    message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount);
+    logError(message);
+    RuntimeInformation.logInfo();
+    initSuffixes();
+    long startTime = System.currentTimeMillis();
+    processPhaseOne();
+    processPhaseTwo();
+    setIndexesTrusted();
+    tempDir.delete();
     long finishTime = System.currentTimeMillis();
     long importTime = (finishTime - startTime);
-
     float rate = 0;
     if (importTime > 0)
+      rate = 1000f * reader.getEntriesRead() / importTime;
+    message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
+            reader.getEntriesRead(), reader.getEntriesIgnored(), reader
+                    .getEntriesRejected(), 0, importTime / 1000, rate);
+    logError(message);
+    return new LDIFImportResult(reader.getEntriesRead(), reader
+            .getEntriesRejected(), reader.getEntriesIgnored());
+  }
+
+
+  private void setIndexesTrusted() throws JebException
+  {
+    try {
+      for(Suffix s : dnSuffixMap.values()) {
+        s.setIndexesTrusted();
+      }
+    }
+    catch (DatabaseException ex)
     {
-      rate = 1000f*importedCount / importTime;
+      Message msg = NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
+      throw new JebException(msg);
     }
+  }
 
-    message = NOTE_JEB_IMPORT_FINAL_STATUS.
-        get(reader.getEntriesRead(), importedCount,
-            reader.getEntriesIgnored(), reader.getEntriesRejected(),
-            migratedCount, importTime/1000, rate);
-    logError(message);
 
-    message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
-        getEntryLimitExceededCount());
-    logError(message);
+  private void processPhaseOne() throws InterruptedException, ExecutionException
+  {
+    initIndexBuffers(threadCount);
+    FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
+    Timer timer = new Timer();
+    timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
+    indexProcessService = Executors.newFixedThreadPool(2 * indexCount);
+    sortService = Executors.newFixedThreadPool(threadCount);
 
+    //Import tasks are collective tasks.
+    List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+    for (int i = 0; i < threadCount; i++)
+    {
+      tasks.add(new ImportTask());
+    }
+    ExecutorService execService = Executors.newFixedThreadPool(threadCount);
+      List<Future<Void>> results = execService.invokeAll(tasks);
+      for (Future<Void> result : results)
+        assert result.isDone();
+    stopIndexWriterTasks();
+    for (Future<?> result : indexWriterFutures)
+    {
+        result.get();
+    }
+    execService.shutdown();
+    freeBufQue.clear();
+    sortService.shutdown();
+    timer.cancel();
+  }
+
+
+
+  private void processPhaseTwo() throws InterruptedException
+  {
+    SecondPhaseProgressTask progress2Task =
+            new SecondPhaseProgressTask(containerIndexMgrMap);
+    Timer timer2 = new Timer();
+    timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL);
+    processIndexFiles();
+    timer2.cancel();
+  }
+
+
+
+  private void processIndexFiles() throws InterruptedException
+  {
+    List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(indexCount);
+    if(bufferCount.get() == 0)
+    {
+      return;
+    }
+    int cacheSize =  cacheSizeFromFreeMemory();
+    int p = 0;
+    int offSet = 0;
+    if(directBuffer != null)
+    {
+      cacheSize = cacheSizeFromDirectMemory();
+    }
+    for(Map.Entry<DatabaseContainer, IndexManager> e :
+            containerIndexMgrMap.entrySet())
+    {
+      DatabaseContainer container = e.getKey();
+      IndexManager indexMgr = e.getValue();
+      boolean isDN2ID = false;
+      if(container instanceof DN2ID)
+      {
+        isDN2ID = true;
+      }
+      if(directBuffer != null)
+      {
+        int cacheSizes = cacheSize * indexMgr.getBufferList().size();
+        offSet += cacheSizes;
+        directBuffer.limit(offSet);
+        directBuffer.position(p);
+        ByteBuffer b = directBuffer.slice();
+        tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, b, cacheSize));
+        p += cacheSizes;
+      }
+      else
+      {
+        tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, cacheSize));
+      }
+    }
+    List<Future<Void>> results = indexProcessService.invokeAll(tasks);
+    for (Future<Void> result : results)
+      assert result.isDone();
+    indexProcessService.shutdown();
+  }
+
+
+  private int cacheSizeFromDirectMemory()
+  {
+    int cap = directBuffer.capacity();
+    int cacheSize = cap/bufferCount.get();
+    if(cacheSize > bufferSize)
+    {
+      cacheSize = bufferSize;
+    }
+    System.out.println("Direct indexes begin Total bufferCount: " +
+            bufferCount.get() + " cacheSize: " + cacheSize);
+    return cacheSize;
+  }
+
+  private int cacheSizeFromFreeMemory()
+  {
+    Runtime runtime = Runtime.getRuntime();
+    long availMemory = runtime.freeMemory()  * MEM_PCT_PHASE_2 / 100;
+    int avgBufSize = (int)(availMemory / bufferCount.get());
+    int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, avgBufSize);
+    if(cacheSize > bufferSize)
+    {
+      cacheSize = bufferSize;
+    }
+    System.out.println("Indirect indexes begin Total bufferCount: " +
+            bufferCount.get() + " avgBufSize: "
+            + avgBufSize + " cacheSize: " + cacheSize);
+    return cacheSize;
+  }
+
+
+  private void stopIndexWriterTasks()
+  {
+    IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0);
+    for(IndexFileWriterTask task : indexWriterList)
+    {
+      task.que.add(idxBuffer);
+    }
   }
 
 
   /**
-   * Run the cleaner if it is needed.
-   *
-   * @param entriesRead The number of entries read so far.
-   * @param evictEntryNumber The number of entries to run the cleaner after
-   * being read.
-   * @throws DatabaseException If a DB problem occurs.
+   * This task processes the LDIF file during phase 1.
    */
-  private void
-  runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
-          throws DatabaseException {
-    if(!firstClean || (entriesRead %  evictEntryNumber) == 0) {
-      //Make sure work queue is empty before starting.
-      drainWorkQueue();
-      Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get();
-      runCleaner(msg);
-      if(!firstClean) {
-        firstClean=true;
-      }
-    }
-  }
+  private final class ImportTask implements Callable<Void> {
+    private final Map<Suffix, Map<DatabaseContainer, IndexBuffer>> suffixMap =
+            new HashMap<Suffix, Map<DatabaseContainer, IndexBuffer>>();
+    private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
+    private final IndexBuffer.DNComparator dnComparator
+            = new IndexBuffer.DNComparator();
+    private final IndexBuffer.IndexComparator indexComparator =
+            new IndexBuffer.IndexComparator();
 
-  /**
-   * Run the cleaner, pausing the task thread output.
-   *
-   * @param header Message to be printed before cleaning.
-   * @throws DatabaseException If a DB problem occurs.
-   */
-  private void runCleaner(Message header) throws DatabaseException {
-    Message msg;
-    long startTime = System.currentTimeMillis();
-    //Need to force a checkpoint.
-    rootContainer.importForceCheckPoint();
-    logError(header);
-    pTask.setPause(true);
-    //Actually clean the files.
-    int cleaned = rootContainer.cleanedLogFiles();
-    //This checkpoint removes the files if any were cleaned.
-    if(cleaned > 0) {
-      msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
-      logError(msg);
-      rootContainer.importForceCheckPoint();
-    }
-    pTask.setPause(false);
-    long finishTime = System.currentTimeMillis();
-    long cleanTime = (finishTime - startTime) / 1000;
-    msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
-    logError(msg);
-  }
 
-  /**
-   * Process a LDIF reader.
-   *
-   * @throws JebException If a JEB problem occurs.
-   * @throws DatabaseException If a DB problem occurs.
-   * @throws IOException If an IO exception occurs.
-   */
-  private void
-  processLDIF() throws JebException, DatabaseException, IOException {
-    Message message = NOTE_JEB_IMPORT_LDIF_START.get();
-    logError(message);
-    do {
-      if (ldifImportConfig.isCancelled()) {
-        break;
-      }
-      if(threads.size() <= 0) {
-        message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
-        throw new JebException(message);
-      }
-      if(unCaughtExceptionThrown) {
-        abortImport();
-      }
-      try {
-        // Read the next entry.
-        Entry entry = reader.readEntry();
-        // Check for end of file.
-        if (entry == null) {
-          message = NOTE_JEB_IMPORT_LDIF_END.get();
-          logError(message);
+    /**
+     * {@inheritDoc}
+     */
+    public Void call() throws Exception
+    {
+      Suffix suffix = null;
+      while (true)
+      {
+        if (config.isCancelled())
+        {
+          IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0);
+          freeBufQue.add(idxBuffer);
+          return null;
+        }
+        Entry entry = reader.readEntry(dnSuffixMap);
 
+        if (entry == null)
+        {
           break;
         }
-        // Route it according to base DN.
-        DNContext DNContext = getImportConfig(entry.getDN());
-        processEntry(DNContext, entry);
-        //If the progress task has noticed eviction proceeding, start running
-        //the cleaner.
-        if(pTask.isEvicting()) {
-          runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
-        }
-      }  catch (LDIFException e) {
-        if (debugEnabled()) {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-      } catch (DirectoryException e) {
-        if (debugEnabled()) {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-      } catch (DatabaseException e)  {
-        if (debugEnabled()) {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-      }
-    } while (true);
-  }
-
-  /**
-   * Process an entry using the specified import context.
-   *
-   * @param DNContext The import context.
-   * @param entry The entry to process.
-   */
-  private void processEntry(DNContext DNContext, Entry entry)
-               throws DirectoryException, DatabaseException, JebException {
-    if(worker0Proc < maxWorker0) {
-       DNContext.addPending(entry.getDN());
-       WorkElement element =
-            WorkElement.decode(entry, DNContext);
-        workThread0.process(element);
-        worker0Proc++;
-    } else {
-      //Add this DN to the pending map.
-      DNContext.addPending(entry.getDN());
-      addEntryQueue(DNContext, entry);
-    }
-  }
-
-  /**
-   * Add work item to specified import context's queue.
-   * @param context The import context.
-   * @param item The work item to add.
-   * @return <CODE>True</CODE> if the the work  item was added to the queue.
-   */
-  private boolean
-  addQueue(DNContext context, WorkElement item) {
-    try {
-      while(!context.getWorkQueue().offer(item, 1000,
-                                            TimeUnit.MILLISECONDS)) {
-        if(threads.size() <= 0) {
-          // All worker threads died. We must stop now.
-          return false;
-        }
-      }
-    } catch (InterruptedException e) {
-      if (debugEnabled()) {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      }
-    }
-    return true;
-  }
-
-
-  /**
-   * Wait until the work queue is empty.
-   */
-  private void drainWorkQueue() {
-    if(threads.size() > 0) {
-      for (DNContext context : importMap.values()) {
-        while (context.getWorkQueue().size() > 0) {
-          try {
-            Thread.sleep(100);
-          } catch (Exception e) {
-            // No action needed.
-          }
-        }
-      }
-    }
-  }
-
-  private void abortImport() throws JebException {
-    //Stop work threads telling them to skip substring flush.
-     stopWorkThreads(false);
-     timer.cancel();
-     Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
-     throw new JebException(message);
-  }
-
-  /**
-   * Stop work threads.
-   *
-   * @param abort <CODE>True</CODE> if stop work threads was called from an
-   *              abort.
-   * @throws JebException if a Jeb error occurs.
-   */
-  private void
-  stopWorkThreads(boolean abort) throws JebException {
-    for (WorkThread t : threads) {
-      t.stopProcessing();
-    }
-    // Wait for each thread to stop.
-    for (WorkThread t : threads) {
-      try {
-        if(!abort && unCaughtExceptionThrown) {
-          timer.cancel();
-          Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
-          throw new JebException(message);
-        }
-        t.join();
-        importedCount += t.getImportedCount();
-      } catch (InterruptedException ie) {
-        // No action needed?
-      }
-    }
-  }
-
-  /**
-   * Clean up after a successful import.
-   *
-   * @throws DatabaseException If a DB error occurs.
-   * @throws JebException If a Jeb error occurs.
-   */
-  private void cleanUp() throws DatabaseException, JebException {
-     Message msg;
-    //Drain the work queue.
-    drainWorkQueue();
-    pTask.setPause(true);
-    long startTime = System.currentTimeMillis();
-    stopWorkThreads(true);
-    //Flush the buffer managers.
-    for(DNContext context : importMap.values()) {
-      context.getBufferManager().prepareFlush();
-      context.getBufferManager().flushAll();
-    }
-    long finishTime = System.currentTimeMillis();
-    long flushTime = (finishTime - startTime) / 1000;
-     msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
-    logError(msg);
-    timer.cancel();
-    for(DNContext context : importMap.values()) {
-      context.setIndexesTrusted();
-    }
-    msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
-    //Run the cleaner.
-    runCleaner(msg);
-    closeIndexCursors();
-  }
-
-
-   private void closeIndexCursors() throws DatabaseException {
-    for (DNContext ic : importMap.values())
-    {
-      ic.getEntryContainer().closeIndexCursors();
-    }
-  }
-
-  /**
-   * Uncaught exception handler.
-   *
-   * @param t The thread working when the exception was thrown.
-   * @param e The exception.
-   */
-  public void uncaughtException(Thread t, Throwable e) {
-     unCaughtExceptionThrown = true;
-     threads.remove(t);
-     Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
-         t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
-     logError(msg);
-   }
-
-  /**
-   * Get the entry limit exceeded counts from the indexes.
-   *
-   * @return Count of the index with entry limit exceeded values.
-   */
-  private int getEntryLimitExceededCount() {
-    int count = 0;
-    for (DNContext ic : importMap.values())
-    {
-      count += ic.getEntryContainer().getEntryLimitExceededCount();
-    }
-    return count;
-  }
-
-  /**
-   * Return an import context related to the specified DN.
-   * @param dn The dn.
-   * @return  An import context.
-   * @throws DirectoryException If an directory error occurs.
-   */
-  private DNContext getImportConfig(DN dn) throws DirectoryException {
-    DNContext DNContext = null;
-    DN nodeDN = dn;
-
-    while (DNContext == null && nodeDN != null) {
-      DNContext = importMap.get(nodeDN);
-      if (DNContext == null)
-      {
-        nodeDN = nodeDN.getParentDNInSuffix();
-      }
-    }
-
-    if (nodeDN == null) {
-      // The entry should not have been given to this backend.
-      Message message =
-              JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
-      throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
-    }
-
-    return DNContext;
-  }
-
-  /**
-   * Creates an import context for the specified entry container.
-   *
-   * @param entryContainer The entry container.
-   * @return Import context to use during import.
-   * @throws DatabaseException If a database error occurs.
-   * @throws JebException If a JEB error occurs.
-   * @throws ConfigException If a configuration contains error.
-   */
-   private DNContext getImportContext(EntryContainer entryContainer)
-      throws DatabaseException, JebException, ConfigException {
-    DN baseDN = entryContainer.getBaseDN();
-    EntryContainer srcEntryContainer = null;
-    List<DN> includeBranches = new ArrayList<DN>();
-    List<DN> excludeBranches = new ArrayList<DN>();
-
-    if(!ldifImportConfig.appendToExistingData() &&
-        !ldifImportConfig.clearBackend())
-    {
-      for(DN dn : ldifImportConfig.getExcludeBranches())
-      {
-        if(baseDN.equals(dn))
+        DN entryDN = entry.getDN();
+        EntryID entryID = (EntryID) entry.getAttachment();
+        suffix = getMatchSuffix(entryDN, dnSuffixMap);
+        if(!suffixMap.containsKey(suffix))
         {
-          // This entire base DN was explicitly excluded. Skip.
-          return null;
+          suffixMap.put(suffix, new HashMap<DatabaseContainer, IndexBuffer>());
         }
-        if(baseDN.isAncestorOf(dn))
+        if(!dn2idPhase2)
         {
-          excludeBranches.add(dn);
-        }
-      }
-
-      if(!ldifImportConfig.getIncludeBranches().isEmpty())
-      {
-        for(DN dn : ldifImportConfig.getIncludeBranches())
-        {
-          if(baseDN.isAncestorOf(dn))
+          if(!processParent(entryDN, entryID, entry, suffix))
           {
-            includeBranches.add(dn);
+            suffix.removePending(entryDN);
+            continue;
           }
-        }
-
-        if(includeBranches.isEmpty())
-        {
-          // There are no branches in the explicitly defined include list under
-          // this base DN. Skip this base DN alltogether.
-
-          return null;
-        }
-
-        // Remove any overlapping include branches.
-        Iterator<DN> includeBranchIterator = includeBranches.iterator();
-        while(includeBranchIterator.hasNext())
-        {
-          DN includeDN = includeBranchIterator.next();
-          boolean keep = true;
-          for(DN dn : includeBranches)
+          if(!suffix.getDN2ID().insert(null, entryDN, entryID))
           {
-            if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
-            {
-              keep = false;
-              break;
-            }
+            suffix.removePending(entryDN);
+             Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+             reader.rejectEntry(entry, msg);
+            continue;
           }
-          if(!keep)
-          {
-            includeBranchIterator.remove();
-          }
-        }
-
-        // Remvoe any exclude branches that are not are not under a include
-        // branch since they will be migrated as part of the existing entries
-        // outside of the include branches anyways.
-        Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
-        while(excludeBranchIterator.hasNext())
-        {
-          DN excludeDN = excludeBranchIterator.next();
-          boolean keep = false;
-          for(DN includeDN : includeBranches)
-          {
-            if(includeDN.isAncestorOf(excludeDN))
-            {
-              keep = true;
-              break;
-            }
-          }
-          if(!keep)
-          {
-            excludeBranchIterator.remove();
-          }
-        }
-
-        if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
-            includeBranches.get(0).equals(baseDN))
-        {
-          // This entire base DN is explicitly included in the import with
-          // no exclude branches that we need to migrate. Just clear the entry
-          // container.
-          entryContainer.lock();
-          entryContainer.clear();
-          entryContainer.unlock();
+          suffix.removePending(entryDN);
+          processID2SC(entryID, entry, suffix);
         }
         else
         {
-          // Create a temp entry container
-          srcEntryContainer = entryContainer;
-          entryContainer =
-              rootContainer.openEntryContainer(baseDN,
-                                               baseDN.toNormalizedString() +
-                                                   "_importTmp");
+          processDN2ID(suffix, entryDN, entryID);
+          suffix.removePending(entryDN);
+        }
+        suffix.getID2Entry().put(null, entryID, entry);
+        processIndexes(suffix, entry, entryID);
+      }
+      flushIndexBuffers();
+      if(!dn2idPhase2)
+      {
+        suffix.getEntryContainer().getID2Children().closeCursor();
+        suffix.getEntryContainer().getID2Subtree().closeCursor();
+      }
+      return null;
+    }
+
+
+    private boolean processParent(DN entryDN, EntryID entryID, Entry entry,
+                                  Suffix suffix) throws DatabaseException
+    {
+      EntryID parentID = null;
+      DN parentDN =
+              suffix.getEntryContainer().getParentWithinBase(entryDN);
+      DN2ID dn2id = suffix.getDN2ID();
+      if(dn2id.get(null, entryDN, LockMode.DEFAULT) != null)
+      {
+        Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+        reader.rejectEntry(entry, msg);
+        return false;
+      }
+
+      if (parentDN != null) {
+        parentID = suffix.getParentID(parentDN);
+        if (parentID == null) {
+          dn2id.remove(null, entryDN);
+          Message msg =
+                      ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
+           reader.rejectEntry(entry, msg);
+          return false;
         }
       }
-    }
-
-    // Create an import context.
-    DNContext DNContext = new DNContext();
-    DNContext.setConfig(config);
-    DNContext.setLDIFImportConfig(this.ldifImportConfig);
-    DNContext.setLDIFReader(reader);
-
-    DNContext.setBaseDN(baseDN);
-    DNContext.setEntryContainer(entryContainer);
-    DNContext.setSrcEntryContainer(srcEntryContainer);
-
-    //Create queue.
-    LinkedBlockingQueue<WorkElement> works =
-        new LinkedBlockingQueue<WorkElement>
-                     (config.getImportQueueSize());
-    DNContext.setWorkQueue(works);
-
-    // Set the include and exclude branches
-    DNContext.setIncludeBranches(includeBranches);
-    DNContext.setExcludeBranches(excludeBranches);
-
-    return DNContext;
-  }
-
-  /**
-   * Add specified context and entry to the work queue.
-   *
-   * @param context The context related to the entry DN.
-   * @param entry The entry to work on.
-   * @return  <CODE>True</CODE> if the element was added to the work queue.
-   */
-  private boolean
-  addEntryQueue(DNContext context,  Entry entry) {
-    WorkElement element =
-            WorkElement.decode(entry, context);
-    return addQueue(context, element);
-  }
-
-  /**
-   * Calculate the memory usage for the substring buffer and the DB cache.
-   */
-  private void calcMemoryLimits() {
-    Message msg;
-    Runtime runtime = Runtime.getRuntime();
-    long freeMemory = runtime.freeMemory();
-    long maxMemory = runtime.maxMemory();
-    long totMemory = runtime.totalMemory();
-    long totFreeMemory = (freeMemory + (maxMemory - totMemory));
-    long dbCacheLimit = (totFreeMemory * 60) / 100;
-    //If there are no substring indexes defined, set the DB cache
-    //size to 75% and take a minimal substring buffer.
-    if(!hasSubIndexes) {
-      dbCacheLimit = (totFreeMemory * 75) / 100;
-    }
-    dbCacheSizeStr = Long.toString(dbCacheLimit);
-    totalAvailBufferMemory = (totFreeMemory * 10) / 100;
-    if(totalAvailBufferMemory < (10 * minBuffer)) {
-       msg =
-          NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
-                                                      (10 * minBuffer));
-      logError(msg);
-      totalAvailBufferMemory = (10 * minBuffer);
-    } else if(!hasSubIndexes) {
-      totalAvailBufferMemory = (10 * minBuffer);
-    }
-    msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
-                                             totalAvailBufferMemory);
-    logError(msg);
-  }
-
-  /**
-   * Return the string representation of the DB cache size.
-   *
-   * @return DB cache size string.
-   */
-  public String getDBCacheSize() {
-    return dbCacheSizeStr;
-  }
-
-  /**
-   * Migrate any existing entries.
-   *
-   * @throws JebException If a JEB error occurs.
-   * @throws DatabaseException  If a DB error occurs.
-   * @throws DirectoryException If a directory error occurs.
-   */
-  private void migrateExistingEntries()
-      throws JebException, DatabaseException, DirectoryException {
-    for(DNContext context : importMap.values()) {
-      EntryContainer srcEntryContainer = context.getSrcEntryContainer();
-      if(srcEntryContainer != null &&
-          !context.getIncludeBranches().isEmpty()) {
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
-        LockMode lockMode = LockMode.DEFAULT;
-        OperationStatus status;
-        Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
-            "existing", String.valueOf(context.getBaseDN()));
-        logError(message);
-        Cursor cursor =
-            srcEntryContainer.getDN2ID().openCursor(null,
-                                                   CursorConfig.READ_COMMITTED);
-        try {
-          status = cursor.getFirst(key, data, lockMode);
-          while(status == OperationStatus.SUCCESS &&
-                !ldifImportConfig.isCancelled()) {
-            if(threads.size() <= 0) {
-              message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
-              throw new JebException(message);
-            }
-            DN dn = DN.decode(ByteString.wrap(key.getData()));
-            if(!context.getIncludeBranches().contains(dn)) {
-              EntryID id = new EntryID(data);
-              Entry entry =
-                  srcEntryContainer.getID2Entry().get(null,
-                      id, LockMode.DEFAULT);
-              processEntry(context, entry);
-              migratedCount++;
-              status = cursor.getNext(key, data, lockMode);
-            }  else {
-              // This is the base entry for a branch that will be included
-              // in the import so we don't want to copy the branch to the new
-              // entry container.
-
-              /**
-               * Advance the cursor to next entry at the same level in the DIT
-               * skipping all the entries in this branch.
-               * Set the next starting value to a value of equal length but
-               * slightly greater than the previous DN. Since keys are compared
-               * in reverse order we must set the first byte (the comma).
-               * No possibility of overflow here.
-               */
-              byte[] begin =
-                  StaticUtils.getBytes("," + dn.toNormalizedString());
-              begin[0] = (byte) (begin[0] + 1);
-              key.setData(begin);
-              status = cursor.getSearchKeyRange(key, data, lockMode);
+      ArrayList<EntryID> IDs;
+      if (parentDN != null && suffix.getParentDN() != null &&
+              parentDN.equals(suffix.getParentDN())) {
+        IDs = new ArrayList<EntryID>(suffix.getIDs());
+        IDs.set(0, entryID);
+      }
+      else
+      {
+        EntryID nodeID;
+        IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
+        IDs.add(entryID);
+        if (parentID != null)
+        {
+          IDs.add(parentID);
+          EntryContainer ec = suffix.getEntryContainer();
+          for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
+               dn = ec.getParentWithinBase(dn)) {
+            if((nodeID =  getAncestorID(dn2id, dn)) == null) {
+              return false;
+            } else {
+              IDs.add(nodeID);
             }
           }
-        } finally {
-          cursor.close();
         }
       }
+      suffix.setParentDN(parentDN);
+      suffix.setIDs(IDs);
+      entry.setAttachment(IDs);
+      return true;
     }
-  }
+
+    private void processID2SC(EntryID entryID, Entry entry, Suffix suffix)
+            throws DatabaseException
+    {
+      Set<byte[]> childKeySet = new HashSet<byte[]>();
+      Set<byte[]> subtreeKeySet = new HashSet<byte[]>();
+      Index id2children = suffix.getEntryContainer().getID2Children();
+      Index id2subtree = suffix.getEntryContainer().getID2Subtree();
+      id2children.indexer.indexEntry(entry, childKeySet);
+      id2subtree.indexer.indexEntry(entry, subtreeKeySet);
+
+      DatabaseEntry dbKey = new DatabaseEntry();
+      DatabaseEntry dbVal = new DatabaseEntry();
+      ImportIDSet idSet = new ImportIDSet();
+      idSet.addEntryID(entryID, id2children.getIndexEntryLimit(),
+              id2children.getMaintainCount());
+      id2children.insert(idSet, childKeySet, dbKey, dbVal);
+
+      DatabaseEntry dbSubKey = new DatabaseEntry();
+      DatabaseEntry dbSubVal = new DatabaseEntry();
+      ImportIDSet idSubSet = new ImportIDSet();
+      idSubSet.addEntryID(entryID, id2subtree.getIndexEntryLimit(),
+              id2subtree.getMaintainCount());
+      id2subtree.insert(idSubSet, subtreeKeySet, dbSubKey, dbSubVal);
+    }
+
+    private EntryID getAncestorID(DN2ID dn2id, DN dn)
+            throws DatabaseException
+    {
+      int i=0;
+      EntryID nodeID = dn2id.get(null, dn, LockMode.DEFAULT);
+      if(nodeID == null) {
+        while((nodeID = dn2id.get(null, dn, LockMode.DEFAULT)) == null) {
+          try {
+            Thread.sleep(50);
+            if(i == 3) {
+              return null;
+            }
+            i++;
+          } catch (Exception e) {
+            return null;
+          }
+        }
+      }
+      return nodeID;
+    }
 
 
-  /**
-   * Migrate excluded entries.
-   *
-   * @throws JebException If a JEB error occurs.
-   * @throws DatabaseException  If a DB error occurs.
-   * @throws DirectoryException If a directory error occurs.
-   */
-  private void migrateExcludedEntries()
-      throws JebException, DatabaseException, DirectoryException {
-    for(DNContext importContext : importMap.values()) {
-      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
-      if(srcEntryContainer != null &&
-          !importContext.getExcludeBranches().isEmpty()) {
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
-        LockMode lockMode = LockMode.DEFAULT;
-        OperationStatus status;
-        Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
-            "excluded", String.valueOf(importContext.getBaseDN()));
-        logError(message);
-        Cursor cursor =
-            srcEntryContainer.getDN2ID().openCursor(null,
-                                                   CursorConfig.READ_COMMITTED);
-        Comparator<byte[]> dn2idComparator =
-            srcEntryContainer.getDN2ID().getComparator();
-        try {
-          for(DN excludedDN : importContext.getExcludeBranches()) {
-            byte[] suffix =
-                StaticUtils.getBytes(excludedDN.toNormalizedString());
-            key.setData(suffix);
-            status = cursor.getSearchKeyRange(key, data, lockMode);
-            if(status == OperationStatus.SUCCESS &&
-                Arrays.equals(key.getData(), suffix)) {
-              // This is the base entry for a branch that was excluded in the
-              // import so we must migrate all entries in this branch over to
-              // the new entry container.
-              byte[] end =
-                  StaticUtils.getBytes("," + excludedDN.toNormalizedString());
-              end[0] = (byte) (end[0] + 1);
 
-              while(status == OperationStatus.SUCCESS &&
-                  dn2idComparator.compare(key.getData(), end) < 0 &&
-                  !ldifImportConfig.isCancelled()) {
-                if(threads.size() <= 0) {
-                  message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
-                  throw new JebException(message);
-                }
-                EntryID id = new EntryID(data);
-                Entry entry = srcEntryContainer.getID2Entry().get(null,
-                    id, LockMode.DEFAULT);
-                processEntry(importContext, entry);
-                migratedCount++;
-                status = cursor.getNext(key, data, lockMode);
+    private void
+    processIndexes(Suffix ctx, Entry entry, EntryID entryID) throws
+            DatabaseException, DirectoryException, JebException, ConfigException
+    {
+      Transaction txn = null;
+      Map<AttributeType, AttributeIndex> attrMap = ctx.getAttrIndexMap();
+      for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
+              attrMap.entrySet()) {
+        AttributeType attrType = mapEntry.getKey();
+        if(entry.hasAttribute(attrType)) {
+          AttributeIndex attributeIndex = mapEntry.getValue();
+          Index index;
+          if((index=attributeIndex.getEqualityIndex()) != null) {
+            indexAttr(ctx, index, entry, entryID);
+          }
+          if((index=attributeIndex.getPresenceIndex()) != null) {
+            indexAttr(ctx, index, entry, entryID);
+          }
+          if((index=attributeIndex.getSubstringIndex()) != null) {
+            indexAttr(ctx, index, entry, entryID);
+          }
+          if((index=attributeIndex.getOrderingIndex()) != null) {
+            indexAttr(ctx, index, entry, entryID);
+          }
+          if((index=attributeIndex.getApproximateIndex()) != null) {
+            indexAttr(ctx, index, entry, entryID);
+          }
+          for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) {
+            vlvIdx.addEntry(txn, entryID, entry);
+          }
+          Map<String,Collection<Index>> extensibleMap =
+                  attributeIndex.getExtensibleIndexes();
+          if(!extensibleMap.isEmpty()) {
+            Collection<Index> subIndexes =
+                    attributeIndex.getExtensibleIndexes().get(
+                            EXTENSIBLE_INDEXER_ID_SUBSTRING);
+            if(subIndexes != null) {
+              for(Index subIndex: subIndexes) {
+                indexAttr(ctx, subIndex, entry, entryID);
+              }
+            }
+            Collection<Index> sharedIndexes =
+                    attributeIndex.getExtensibleIndexes().get(
+                            EXTENSIBLE_INDEXER_ID_SHARED);
+            if(sharedIndexes !=null) {
+              for(Index sharedIndex:sharedIndexes) {
+                indexAttr(ctx, sharedIndex, entry, entryID);
               }
             }
           }
         }
-        finally
+      }
+    }
+
+
+
+    private void indexAttr(Suffix ctx, Index index, Entry entry,
+                           EntryID entryID)
+            throws DatabaseException, ConfigException
+    {
+      insertKeySet.clear();
+      index.indexer.indexEntry(entry, insertKeySet);
+      for(byte[] key : insertKeySet)
+      {
+        processKey(ctx, index, key, entryID, indexComparator, null);
+      }
+    }
+
+
+    private void flushIndexBuffers() throws InterruptedException,
+                 ExecutionException
+    {
+      Iterator<Suffix> i  = dnSuffixMap.values().iterator();
+      Suffix suffix = i.next();
+      for(Map<DatabaseContainer, IndexBuffer> map : suffixMap.values())
+      {
+        for(Map.Entry<DatabaseContainer, IndexBuffer> e : map.entrySet())
         {
-          cursor.close();
+          DatabaseContainer container = e.getKey();
+          IndexBuffer indexBuffer = e.getValue();
+          if(container instanceof DN2ID)
+          {
+            indexBuffer.setComparator(dnComparator);
+          }
+          else
+          {
+            indexBuffer.setComparator(indexComparator);
+          }
+          indexBuffer.setContainer(container);
+          indexBuffer.setEntryContainer(suffix.getEntryContainer());
+          Future<Void> future = sortService.submit(new SortTask(indexBuffer));
+          future.get();
         }
       }
     }
+
+
+    private void
+    processKey(Suffix ctx, DatabaseContainer container, byte[] key,
+               EntryID entryID,IndexBuffer.ComparatorBuffer<byte[]> comparator,
+               EntryContainer entryContainer) throws ConfigException
+    {
+      IndexBuffer indexBuffer;
+      Map<DatabaseContainer, IndexBuffer> conMap = suffixMap.get(ctx);
+      if(!conMap.containsKey(container))
+      {
+        indexBuffer = getNewIndexBuffer();
+        conMap.put(container, indexBuffer);
+      }
+      else
+      {
+        indexBuffer = conMap.get(container);
+      }
+      if(!indexBuffer.isSpaceAvailable(key))
+      {
+        indexBuffer.setContainer(container);
+        indexBuffer.setComparator(comparator);
+        indexBuffer.setEntryContainer(entryContainer);
+        sortService.submit(new SortTask(indexBuffer));
+        indexBuffer = getNewIndexBuffer();
+        conMap.remove(container);
+        conMap.put(container, indexBuffer);
+      }
+      indexBuffer.add(key, entryID);
+    }
+
+
+    private IndexBuffer getNewIndexBuffer() throws ConfigException
+    {
+      IndexBuffer indexBuffer = freeBufQue.poll();
+      if(indexBuffer.isPoison())
+      {
+        Message msg = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
+                "Abort import - MPD");
+        throw new ConfigException(msg);
+      }
+      return indexBuffer;
+    }
+
+
+    private void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
+            throws ConfigException
+    {
+      DatabaseContainer dn2id = suffix.getDN2ID();
+      byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
+      processKey(suffix, dn2id, dnBytes, entryID, dnComparator,
+              suffix.getEntryContainer());
+
+    }
+  }
+
+  /**
+   * The task reads the temporary index files and writes their results to the
+   * index database.
+   */
+  private final class IndexWriteDBTask implements Callable<Void> {
+
+    private final IndexManager indexMgr;
+    private final boolean isDN2ID;
+    private final DatabaseEntry dbKey, dbValue;
+    private final DN2ID dn2id;
+    private final Index index;
+
+    private final EntryContainer entryContainer;
+    private final int id2ChildLimit;
+    private final boolean id2ChildMCount;
+
+    private TreeMap<DN,EntryID> parentIDMap = new TreeMap<DN,EntryID>();
+    private DN parentDN, lastDN;
+    private EntryID parentID, lastID;
+    private final Map<byte[], ImportIDSet> id2childTree;
+    private final Map<byte[], ImportIDSet> id2subtreeTree;
+    private final int cacheSize;
+    private ByteBuffer directBuffer = null;
+
+    public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID,
+                            ByteBuffer b, int cacheSize)
+    {
+      this(indexMgr, isDN2ID, cacheSize);
+      directBuffer = b;
+    }
+
+    public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID,
+                            int cacheSize)
+    {
+      this.indexMgr = indexMgr;
+      this.entryContainer = indexMgr.entryContainer;
+      this.isDN2ID = isDN2ID;
+      this.dbKey = new DatabaseEntry();
+      this.dbValue = new DatabaseEntry();
+      this.cacheSize = cacheSize;
+      if(isDN2ID)
+      {
+        this.dn2id = indexMgr.dn2id;
+        this.index = null;
+        id2ChildLimit = entryContainer.getID2Children().getIndexEntryLimit();
+        id2ChildMCount = entryContainer.getID2Subtree().getMaintainCount();
+        Comparator<byte[]> id2ChildComparator =
+                entryContainer.getID2Children().getComparator();
+        Comparator<byte[]> id2SubtreeComparator =
+                entryContainer.getID2Subtree().getComparator();
+        id2childTree =
+                new TreeMap<byte[], ImportIDSet>(id2ChildComparator);
+        id2subtreeTree =
+                new TreeMap<byte[], ImportIDSet>(id2SubtreeComparator);
+      }
+      else
+      {
+        this.dn2id = null;
+        this.index = indexMgr.getIndex();
+        id2subtreeTree = null;
+        id2childTree = null;
+        id2ChildLimit = 0;
+        id2ChildMCount = false;
+      }
+    }
+
+
+    public Void call() throws Exception
+    {
+
+      Comparator<byte[]> comparator = indexMgr.getComparator();
+      int limit = indexMgr.getLimit();
+      boolean maintainCount = indexMgr.getMaintainCount();
+      byte[] cKey = null;
+      ImportIDSet cIDSet = null;
+      indexMgr.init();
+      List<Buffer> bufferList = indexMgr.getBufferList();
+      SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
+      int p = 0;
+      int offSet = cacheSize;
+      for(Buffer b : bufferList)
+      {
+        if(directBuffer != null)
+        {
+          directBuffer.position(p);
+          directBuffer.limit(offSet);
+          ByteBuffer slice = directBuffer.slice();
+          b.init(indexMgr, slice, cacheSize);
+          p += cacheSize;
+          offSet += cacheSize;
+        }
+        else
+        {
+          b.init(indexMgr, null, cacheSize);
+        }
+        bufferSet.add(b);
+      }
+      while(!bufferSet.isEmpty())
+      {
+        Buffer b;
+        b = bufferSet.first();
+        if(b == null) {
+          System.out.println("null b");
+        }
+        bufferSet.remove(b);
+        byte[] key = b.getKey();
+        ImportIDSet idSet = b.getIDSet();
+        if(cKey == null)
+        {
+          cKey = key;
+          cIDSet = idSet;
+        }
+        else
+        {
+          if(comparator.compare(key, cKey) != 0)
+          {
+            addToDB(cKey, cIDSet);
+            indexMgr.incrKeyCount();
+            cKey = key;
+            cIDSet = idSet;
+          }
+          else
+          {
+            cIDSet.setKey(cKey);
+            cIDSet.merge(idSet, limit, maintainCount);
+          }
+        }
+        if(b.hasMoreData())
+        {
+          b.getNextRecord();
+          bufferSet.add(b);
+        }
+      }
+      if(cKey != null)
+      {
+        addToDB(cKey, cIDSet);
+      }
+      cleanUP();
+      return null;
+    }
+
+
+    private void cleanUP() throws DatabaseException, DirectoryException,
+            IOException
+    {
+      if(!isDN2ID) {
+        index.closeCursor();
+        Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(index.getName());
+        logError(msg);
+
+      }
+      else
+      {
+        if(dn2idPhase2)
+        {
+          flushSubTreeChildIndexes();
+        }
+      }
+      indexMgr.setDone();
+      indexMgr.close();
+      indexMgr.deleteIndexFile();
+    }
+
+
+    private void flushSubTreeChildIndexes()
+            throws DatabaseException, DirectoryException
+    {
+      Index  id2child = entryContainer.getID2Children();
+      Set<Map.Entry<byte[], ImportIDSet>> id2childSet =
+              id2childTree.entrySet();
+      for(Map.Entry<byte[], ImportIDSet> e : id2childSet)
+      {
+        byte[] key = e.getKey();
+        ImportIDSet idSet = e.getValue();
+        dbKey.setData(key);
+        id2child.insert(dbKey, idSet, dbValue);
+      }
+      id2child.closeCursor();
+      Index id2subtree = entryContainer.getID2Subtree();
+      Set<Map.Entry<byte[], ImportIDSet>> subtreeSet =
+              id2subtreeTree.entrySet();
+      for(Map.Entry<byte[], ImportIDSet> e : subtreeSet)
+      {
+        byte[] key = e.getKey();
+        ImportIDSet idSet = e.getValue();
+        dbKey.setData(key);
+        id2subtree.insert(dbKey, idSet, dbValue);
+      }
+      id2subtree.closeCursor();
+      Message msg =
+             NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getTotDNCount());
+      logError(msg);
+    }
+
+
+    private void addToDB(byte[] key, ImportIDSet record)
+            throws InterruptedException, DatabaseException, DirectoryException
+    {
+      record.setKey(key);
+      if(!this.isDN2ID)
+      {
+        addIndex(record);
+      }
+      else
+      {
+        if(dn2idPhase2)
+        {
+          addDN2ID(record);
+        }
+      }
+    }
+
+
+    private void id2Subtree(EntryContainer ec, EntryID childID,
+                            int limit, boolean mCount) throws DatabaseException
+    {
+      ImportIDSet idSet;
+      if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
+      {
+        idSet = new ImportIDSet();
+        id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
+      }
+      else
+      {
+        idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
+      }
+      idSet.addEntryID(childID, limit, mCount);
+      for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
+           dn = ec.getParentWithinBase(dn))
+      {
+        EntryID nodeID = parentIDMap.get(dn);
+        if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
+        {
+          idSet = new ImportIDSet();
+          id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
+        }
+        else
+        {
+          idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
+        }
+        idSet.addEntryID(childID, limit, mCount);
+      }
+    }
+
+    private void id2child(EntryID childID, int limit, boolean mCount)
+    {
+      ImportIDSet idSet;
+      if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
+      {
+        idSet = new ImportIDSet();
+        id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
+      }
+      else
+      {
+        idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
+      }
+      idSet.addEntryID(childID, limit, mCount);
+    }
+
+    private boolean checkParent(DN dn, EntryID id, EntryContainer ec)
+    {
+      if(parentIDMap.isEmpty())
+      {
+        parentIDMap.put(dn, id);
+        return true;
+      }
+      else if(lastDN != null && lastDN.isAncestorOf(dn))
+      {
+        parentIDMap.put(lastDN, lastID);
+        parentDN = lastDN;
+        parentID = lastID;
+        lastDN = dn;
+        lastID = id;
+        return true;
+      }
+      else if(parentIDMap.lastKey().isAncestorOf(dn))
+      {
+        parentDN = parentIDMap.lastKey();
+        parentID = parentIDMap.get(parentDN);
+        lastDN = dn;
+        lastID = id;
+        return true;
+      }
+      else
+      {
+        DN pDN = ec.getParentWithinBase(dn);
+        if(parentIDMap.containsKey(pDN)) {
+          DN lastKey = parentIDMap.lastKey();
+          Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey);
+          for(Map.Entry<DN, EntryID> e : subMap.entrySet())
+          {
+            subMap.remove(e.getKey());
+          }
+          parentDN = pDN;
+          parentID = parentIDMap.get(pDN);
+          lastDN = dn;
+          lastID = id;
+        }
+        else
+        {
+          Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString());
+          Entry e = new Entry(dn, null, null, null);
+          reader.rejectEntry(e, msg);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private void addDN2ID(ImportIDSet record)
+            throws DatabaseException, DirectoryException
+    {
+      DatabaseEntry idVal = new DatabaseEntry();
+      dbKey.setData(record.getKey());
+      idVal.setData(record.toDatabase());
+      DN dn = DN.decode(ByteString.wrap(dbKey.getData()));
+      EntryID entryID = new EntryID(idVal);
+      if(!checkParent(dn, entryID, entryContainer))
+      {
+        return;
+      }
+      dn2id.putRaw(null, dbKey, idVal);
+      indexMgr.addTotDNCount(1);
+      if(parentDN != null)
+      {
+        id2child(entryID, id2ChildLimit, id2ChildMCount);
+        id2Subtree(entryContainer,
+                entryID, id2ChildLimit, id2ChildMCount);
+      }
+    }
+
+
+    private void addIndex(ImportIDSet record) throws DatabaseException
+    {
+      dbKey.setData(record.getKey());
+      index.insert(dbKey, record, dbValue);
+    }
   }
 
 
   /**
+   * This task writes the temporary index files using the sorted buffers read
+   * from a blocking queue.
+   */
+  private final class IndexFileWriterTask implements Runnable
+  {
+    private final IndexManager indexMgr;
+    private final BlockingQueue<IndexBuffer> que;
+    private final ByteArrayOutputStream byteStream =
+            new ByteArrayOutputStream(2 * bufferSize);
+    private final DataOutputStream dataStream;
+    private long bufCount = 0;
+    private final File file;
+    private final SortedSet<IndexBuffer> indexSortedSet;
+    private boolean poisonSeen = false;
+
+    public IndexFileWriterTask(BlockingQueue<IndexBuffer> que,
+                            IndexManager indexMgr) throws FileNotFoundException
+    {
+      this.que = que;
+      file = indexMgr.getFile();
+      this.indexMgr = indexMgr;
+      BufferedOutputStream bufferedStream =
+                   new BufferedOutputStream(new FileOutputStream(file), 2 * MB);
+      dataStream = new DataOutputStream(bufferedStream);
+      indexSortedSet = new TreeSet<IndexBuffer>();
+    }
+
+
+    public void run()
+    {
+      long offset = 0;
+      List<IndexBuffer> l = new LinkedList<IndexBuffer>();
+      try {
+        while(true)
+        {
+          IndexBuffer indexBuffer = que.poll();
+          if(indexBuffer != null)
+          {
+            long beginOffset = offset;
+            long bufLen;
+            if(!que.isEmpty())
+            {
+              que.drainTo(l, DRAIN_TO);
+              l.add(indexBuffer);
+              bufLen = writeIndexBuffers(l);
+              for(IndexBuffer id : l)
+              {
+                id.reset();
+              }
+              freeBufQue.addAll(l);
+              l.clear();
+              if(poisonSeen)
+              {
+                break;
+              }
+            }
+            else
+            {
+              if(indexBuffer.isPoison())
+              {
+                break;
+              }
+              bufLen = writeIndexBuffer(indexBuffer);
+              indexBuffer.reset();
+              freeBufQue.add(indexBuffer);
+            }
+            offset += bufLen;
+            indexMgr.addBuffer(new Buffer(beginOffset, offset, bufCount));
+            bufCount++;
+            bufferCount.incrementAndGet();
+          }
+        }
+        dataStream.close();
+        indexMgr.setFileLength();
+      }
+      catch (IOException e) {
+        Message msg =
+                ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(),
+                                    e.getMessage());
+        logError(msg);
+      }
+    }
+
+
+    private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException
+    {
+      int numKeys = indexBuffer.getNumberKeys();
+      indexBuffer.setPos(-1);
+      long bufLen = 0;
+      byteStream.reset();
+      for(int i = 0; i < numKeys; i++)
+      {
+        if(indexBuffer.getPos() == -1)
+        {
+          indexBuffer.setPos(i);
+          byteStream.write(indexBuffer.getID(i));
+          continue;
+        }
+
+        if(!indexBuffer.compare(i))
+        {
+          int recLen = indexBuffer.getKeySize();
+          recLen += byteStream.size();
+          recLen += 8;
+          bufLen += recLen;
+          indexBuffer.writeKey(dataStream);
+          dataStream.writeInt(byteStream.size());
+          byteStream.writeTo(dataStream);
+          indexBuffer.setPos(i);
+          byteStream.reset();
+        }
+        byteStream.write(indexBuffer.getID(i));
+      }
+
+      if(indexBuffer.getPos() != -1)
+      {
+        int recLen = indexBuffer.getKeySize();
+        recLen += byteStream.size();
+        recLen += 8;
+        bufLen += recLen;
+        indexBuffer.writeKey(dataStream);
+        dataStream.writeInt(byteStream.size());
+        byteStream.writeTo(dataStream);
+      }
+      return bufLen;
+    }
+
+
+    private long writeIndexBuffers(List<IndexBuffer> buffers)
+            throws IOException
+    {
+      long id = 0;
+      long bufLen = 0;
+      byteStream.reset();
+      for(IndexBuffer b : buffers)
+      {
+        if(b.isPoison())
+        {
+          poisonSeen = true;
+        }
+        else
+        {
+          b.setPos(0);
+          b.setID(id++);
+          indexSortedSet.add(b);
+        }
+      }
+      byte[] saveKey = null;
+      while(!indexSortedSet.isEmpty())
+      {
+        IndexBuffer b = indexSortedSet.first();
+        indexSortedSet.remove(b);
+        byte[] key = b.getKeyBytes(b.getPos());
+        if(saveKey == null)
+        {
+          saveKey = key;
+          byteStream.write(b.getID(b.getPos()));
+        }
+        else
+        {
+          if(!b.compare(saveKey))
+          {
+            int recLen = saveKey.length;
+            recLen += byteStream.size();
+            recLen += 8;
+            bufLen += recLen;
+            dataStream.writeInt(saveKey.length);
+            dataStream.write(saveKey);
+            dataStream.writeInt(byteStream.size());
+            byteStream.writeTo(dataStream);
+            byteStream.reset();
+            saveKey = key;
+            byteStream.write(b.getID(b.getPos()));
+          }
+          else
+          {
+            byteStream.write(b.getID(b.getPos()));
+          }
+        }
+        if(b.hasMoreData())
+        {
+          b.getNextRecord();
+          indexSortedSet.add(b);
+        }
+      }
+      if(saveKey != null)
+      {
+        int recLen = saveKey.length;
+        recLen += byteStream.size();
+        recLen += 8;
+        bufLen += recLen;
+        dataStream.writeInt(saveKey.length);
+        dataStream.write(saveKey);
+        dataStream.writeInt(byteStream.size());
+        byteStream.writeTo(dataStream);
+      }
+      return bufLen;
+    }
+  }
+
+  /**
+   * This task main function is to sort the index buffers given to it from
+   * the import tasks reading the LDIF file. It will also create a index
+   * file writer task and corresponding queue if needed. The sorted index
+   * buffers are put on the index file writer queues for writing to a temporary
+   * file.
+   */
+  private final class SortTask implements Callable<Void>
+  {
+
+    private final IndexBuffer indexBuffer;
+
+    public SortTask(IndexBuffer indexBuffer)
+    {
+      this.indexBuffer = indexBuffer;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Void call() throws Exception
+    {
+      if (config.isCancelled())
+      {
+        return null;
+      }
+      indexBuffer.sort();
+      if(containerQueMap.containsKey(indexBuffer.getContainer())) {
+        BlockingQueue<IndexBuffer> q =
+                containerQueMap.get(indexBuffer.getContainer());
+        q.add(indexBuffer);
+      }
+      else
+      {
+        DatabaseContainer container = indexBuffer.getContainer();
+        EntryContainer entryContainer = indexBuffer.getEntryContainer();
+        createIndexWriterTask(container, entryContainer);
+        BlockingQueue<IndexBuffer> q = containerQueMap.get(container);
+        q.add(indexBuffer);
+      }
+      return null;
+    }
+
+    private void createIndexWriterTask(DatabaseContainer container,
+                                       EntryContainer entryContainer)
+    throws FileNotFoundException
+    {
+      synchronized(container) {
+        if(containerQueMap.containsKey(container))
+        {
+          return;
+        }
+        IndexManager indexMgr;
+        if(container instanceof Index)
+        {
+          Index index = (Index) container;
+          indexMgr = new IndexManager(index);
+        }
+        else
+        {
+          DN2ID dn2id = (DN2ID) container;
+          indexMgr = new IndexManager(dn2id, entryContainer);
+        }
+        containerIndexMgrMap.put(container, indexMgr);
+        BlockingQueue<IndexBuffer> newQue =
+                new ArrayBlockingQueue<IndexBuffer>(threadCount + 5);
+        IndexFileWriterTask indexWriter =
+                new IndexFileWriterTask(newQue, indexMgr);
+        indexWriterList.add(indexWriter);
+        indexWriterFutures.add(indexProcessService.submit(indexWriter));
+        containerQueMap.put(container, newQue);
+      }
+    }
+  }
+
+  /**
+   * The buffer class is used to process a buffer from the temporary index files
+   * during phase 2 processing.
+   */
+  private final class Buffer implements Comparable<Buffer>
+  {
+    private IndexManager indexMgr;
+    private final long begin, end, id;
+    private long offset;
+    private ByteBuffer cache;
+    private int keyLen, idLen;
+    private byte[] key;
+    private ImportIDSet idSet;
+
+
+    public Buffer(long begin, long end, long id)
+    {
+      this.begin = begin;
+      this.end = end;
+      this.offset = 0;
+      this.id = id;
+    }
+
+
+    private void init(IndexManager indexMgr, ByteBuffer b,
+                      long cacheSize) throws IOException
+    {
+      this.indexMgr = indexMgr;
+      if(b == null)
+      {
+        cache = ByteBuffer.allocate((int)cacheSize);
+      }
+      else
+      {
+        cache = b;
+      }
+      loadCache();
+      cache.flip();
+      getNextRecord();
+    }
+
+
+    private void loadCache() throws IOException
+    {
+      FileChannel fileChannel = indexMgr.getChannel();
+      fileChannel.position(begin + offset);
+      long leftToRead =  end - (begin + offset);
+      long bytesToRead;
+      if(leftToRead < cache.remaining())
+      {
+        int pos = cache.position();
+        cache.limit((int) (pos + leftToRead));
+        bytesToRead = (int)leftToRead;
+      }
+      else
+      {
+        bytesToRead = Math.min((end - offset),cache.remaining());
+      }
+      int bytesRead = 0;
+      while(bytesRead < bytesToRead)
+      {
+        bytesRead += fileChannel.read(cache);
+      }
+      offset += bytesRead;
+      indexMgr.addBytesRead(bytesRead);
+    }
+
+    public boolean hasMoreData() throws IOException
+    {
+      boolean ret = ((begin + offset) >= end) ? true: false;
+      if(cache.remaining() == 0 && ret)
+      {
+        return false;
+      }
+      else
+      {
+        return true;
+      }
+    }
+
+    public byte[] getKey()
+    {
+      return key;
+    }
+
+    public ImportIDSet getIDSet()
+    {
+      return idSet;
+    }
+
+    public long getBufID()
+    {
+      return id;
+    }
+
+    public void getNextRecord()  throws IOException
+    {
+      getNextKey();
+      getNextIDSet();
+    }
+
+    private int getInt()  throws IOException
+    {
+      ensureData(4);
+      return cache.getInt();
+    }
+
+    private long getLong()  throws IOException
+    {
+      ensureData(8);
+      return cache.getLong();
+    }
+
+    private void getBytes(byte[] b) throws IOException
+    {
+      ensureData(b.length);
+      cache.get(b);
+    }
+
+    private void getNextKey() throws IOException, BufferUnderflowException
+    {
+      keyLen = getInt();
+      key = new byte[keyLen];
+        getBytes(key);
+    }
+
+
+    private void getNextIDSet() throws IOException, BufferUnderflowException
+    {
+      idLen = getInt();
+      int idCount = idLen/8;
+      idSet = new ImportIDSet(idCount);
+      for(int i = 0; i < idCount; i++)
+      {
+        long l = getLong();
+        idSet.addEntryID(l, indexMgr.getLimit(), indexMgr.getMaintainCount());
+      }
+    }
+
+
+    private void ensureData(int len) throws IOException
+    {
+      if(cache.remaining() == 0)
+      {
+        cache.clear();
+        loadCache();
+        cache.flip();
+      }
+      else if(cache.remaining() < len)
+      {
+        cache.compact();
+        loadCache();
+        cache.flip();
+      }
+    }
+
+    public int compareTo(Buffer o) {
+      if(key == null) {
+        if(id == o.getBufID())
+        {
+          return 0;
+        }
+        else
+        {
+          return id > o.getBufID() ? 1 : -1;
+        }
+      }
+      if(this.equals(o))
+      {
+        return 0;
+      }
+      int rc = indexMgr.getComparator().compare(key, o.getKey());
+      if(rc == 0)
+      {
+        if(idSet.isDefined())
+        {
+          return -1;
+        }
+        else if(o.getIDSet().isDefined())
+        {
+          return 1;
+        }
+        else if(idSet.size() == o.getIDSet().size())
+        {
+          rc = id > o.getBufID() ? 1 : -1;
+        }
+        else
+        {
+          rc = idSet.size() - o.getIDSet().size();
+        }
+      }
+      return rc;
+    }
+  }
+
+  /**
+   * The index manager class is used to carry information about index processing
+   * from phase 1 to phase 2.
+   */
+  private final class IndexManager
+  {
+    private final Index index;
+    private final DN2ID dn2id;
+    private final EntryContainer entryContainer;
+    private final File file;
+
+
+    private RandomAccessFile raf = null;
+    private final List<Buffer> bufferList = new LinkedList<Buffer>();
+    private final int limit;
+    private long fileLength, bytesRead = 0;
+    private final boolean maintainCount;
+    private final Comparator<byte[]> comparator;
+    private boolean done = false;
+    private long totalDNS;
+    private AtomicInteger keyCount = new AtomicInteger(0);
+    private final String name;
+
+    public IndexManager(Index index)
+    {
+      this.index = index;
+      dn2id = null;
+      file = new File(tempDir, index.getName());
+      name = index.getName();
+      limit = index.getIndexEntryLimit();
+      maintainCount = index.getMaintainCount();
+      comparator = index.getComparator();
+      entryContainer = null;
+    }
+
+
+    public IndexManager(DN2ID dn2id, EntryContainer entryContainer)
+    {
+      index = null;
+      this.dn2id = dn2id;
+      file = new File(tempDir, dn2id.getName());
+      limit = 1;
+      maintainCount = false;
+      comparator = dn2id.getComparator();
+      this.entryContainer = entryContainer;
+      name = dn2id.getName();
+    }
+
+    public void init() throws FileNotFoundException
+    {
+      raf = new RandomAccessFile(file, "r");
+    }
+
+    public FileChannel getChannel()
+    {
+      return raf.getChannel();
+    }
+
+    public void addBuffer(Buffer o)
+    {
+      this.bufferList.add(o);
+    }
+
+    public List<Buffer> getBufferList()
+    {
+      return bufferList;
+    }
+
+    public File getFile()
+    {
+      return file;
+    }
+
+    public void deleteIndexFile()
+    {
+       file.delete();
+    }
+
+    public void close() throws IOException
+    {
+        raf.close();
+    }
+
+    public int getLimit()
+    {
+      return limit;
+    }
+
+    public boolean getMaintainCount()
+    {
+      return maintainCount;
+    }
+
+    public Comparator<byte[]> getComparator()
+    {
+      return comparator;
+    }
+
+    public Index getIndex()
+    {
+      return index;
+    }
+
+    public void setFileLength()
+    {
+      this.fileLength = file.length();
+    }
+
+    public void addBytesRead(int bytesRead)
+    {
+      this.bytesRead += bytesRead;
+    }
+
+    public void setDone()
+    {
+      this.done = true;
+    }
+
+    public void addTotDNCount(int delta)
+    {
+      this.totalDNS += delta;
+    }
+
+
+    public long getTotDNCount()
+    {
+      return totalDNS;
+    }
+
+
+    public void printStats(long deltaTime)
+    {
+      if(!done)
+      {
+        float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
+        Message msg = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(name,
+                       (fileLength - bytesRead), rate);
+        logError(msg);
+      }
+    }
+
+    public void incrKeyCount()
+    {
+      keyCount.incrementAndGet();
+    }
+  }
+
+  /**
    * This class reports progress of the import job at fixed intervals.
    */
-  private final class ProgressTask extends TimerTask
+  private final class FirstPhaseProgressTask extends TimerTask
   {
     /**
      * The number of entries that had been read at the time of the
@@ -993,89 +1795,72 @@
     private EnvironmentStats prevEnvStats;
 
     /**
-     * The number of bytes in a megabyte.
-     * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
+     * The number of bytes in a megabyte. Note that 1024*1024 bytes may
+     * eventually become known as a mebibyte(MiB).
      */
-    public static final int bytesPerMegabyte = 1024*1024;
+    public static final int bytesPerMegabyte = 1024 * 1024;
 
-    //Determines if the ldif is being read.
+    // Determines if the ldif is being read.
     private boolean ldifRead = false;
 
-    //Determines if eviction has been detected.
+    // Determines if eviction has been detected.
     private boolean evicting = false;
 
-    //Entry count when eviction was detected.
+    // Entry count when eviction was detected.
     private long evictionEntryCount = 0;
 
-    //Suspend output.
+    // Suspend output.
     private boolean pause = false;
 
+
+
     /**
      * Create a new import progress task.
-     * @throws DatabaseException If an error occurs in the JE database.
      */
-    public ProgressTask() throws DatabaseException
+    public FirstPhaseProgressTask()
     {
       previousTime = System.currentTimeMillis();
-      prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
+      try
+      {
+        prevEnvStats =
+                rootContainer.getEnvironmentStats(new StatsConfig());
+      }
+      catch (DatabaseException e)
+      {
+        throw new RuntimeException(e);
+      }
     }
 
-    /**
-     * Return if reading the LDIF file.
-     */
-    public void ldifRead() {
-      ldifRead=true;
-    }
 
-   /**
-    * Return value of evicting flag.
-    *
-    * @return <CODE>True</CODE> if eviction is detected.
-    */
-    public  boolean isEvicting() {
-     return evicting;
-    }
-
-    /**
-     * Return count of entries when eviction was detected.
-     *
-     * @return  The entry count when eviction was detected.
-     */
-    public long getEvictionEntryCount() {
-      return evictionEntryCount;
-    }
-
-    /**
-     * Suspend output if true.
-     *
-     * @param v The value to set the suspend value to.
-     */
-    public void setPause(boolean v) {
-    pause=v;
-   }
 
     /**
      * The action to be performed by this timer task.
      */
-    public void run() {
+    @Override
+    public void run()
+    {
       long latestCount = reader.getEntriesRead() + 0;
       long deltaCount = (latestCount - previousCount);
       long latestTime = System.currentTimeMillis();
       long deltaTime = latestTime - previousTime;
       Message message;
-      if (deltaTime == 0) {
+      if (deltaTime == 0)
+      {
         return;
       }
-      if(pause) {
+      if (pause)
+      {
         return;
       }
-      if(!ldifRead) {
-        long numRead     = reader.getEntriesRead();
-        long numIgnored  = reader.getEntriesIgnored();
+      if (!ldifRead)
+      {
+        long numRead = reader.getEntriesRead();
+        long numIgnored = reader.getEntriesIgnored();
         long numRejected = reader.getEntriesRejected();
-         float rate = 1000f*deltaCount / deltaTime;
-         message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(
-            numRead, numIgnored, numRejected, 0, rate);
+        float rate = 1000f * deltaCount / deltaTime;
+        message =
+                NOTE_JEB_IMPORT_PROGRESS_REPORT.get(numRead, numIgnored,
+                        numRejected, 0, rate);
         logError(message);
       }
       try
@@ -1083,16 +1868,18 @@
         Runtime runtime = Runtime.getRuntime();
         long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
         EnvironmentStats envStats =
-            rootContainer.getEnvironmentStats(new StatsConfig());
+                rootContainer.getEnvironmentStats(new StatsConfig());
         long nCacheMiss =
-            envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+                envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
 
         float cacheMissRate = 0;
-        if (deltaCount > 0) {
-          cacheMissRate = nCacheMiss/(float)deltaCount;
+        if (deltaCount > 0)
+        {
+          cacheMissRate = nCacheMiss / (float) deltaCount;
         }
-        message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
-            freeMemory, cacheMissRate);
+        message =
+                NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
+                        cacheMissRate);
         logError(message);
         long evictPasses = envStats.getNEvictPasses();
         long evictNodes = envStats.getNNodesExplicitlyEvicted();
@@ -1102,37 +1889,196 @@
         long cleanerEntriesRead = envStats.getNCleanerEntriesRead();
         long cleanerINCleaned = envStats.getNINsCleaned();
         long checkPoints = envStats.getNCheckpoints();
-        if(evictPasses != 0) {
-          if(!evicting) {
-            evicting=true;
-            if(!ldifRead) {
-              evictionEntryCount=reader.getEntriesRead();
+        if (evictPasses != 0)
+        {
+          if (!evicting)
+          {
+            evicting = true;
+            if (!ldifRead)
+            {
+              evictionEntryCount = reader.getEntriesRead();
               message =
-                 NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
+                      NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED
+                              .get(evictionEntryCount);
               logError(message);
             }
           }
           message =
-                  NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
-                          evictNodes, evictBinsStrip);
+                  NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
+                          evictPasses, evictNodes, evictBinsStrip);
           logError(message);
         }
-        if(cleanerRuns != 0) {
-          message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
-                  cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
+        if (cleanerRuns != 0)
+        {
+          message =
+                  NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
+                          cleanerDeletions, cleanerEntriesRead,
+                          cleanerINCleaned);
           logError(message);
         }
-        if(checkPoints  > 1) {
-          message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
+        if (checkPoints > 1)
+        {
+          message =
+                  NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
           logError(message);
         }
         prevEnvStats = envStats;
-      } catch (DatabaseException e) {
+      }
+      catch (DatabaseException e)
+      {
         // Unlikely to happen and not critical.
       }
       previousCount = latestCount;
       previousTime = latestTime;
     }
   }
-}
 
+
+
+  /**
+   * This class reports progress of the import job at fixed intervals.
+   */
+  private final class SecondPhaseProgressTask extends TimerTask
+  {
+    /**
+     * The number of entries that had been read at the time of the
+     * previous progress report.
+     */
+    private long previousCount = 0;
+
+    /**
+     * The time in milliseconds of the previous progress report.
+     */
+    private long previousTime;
+
+    /**
+     * The environment statistics at the time of the previous report.
+     */
+    private EnvironmentStats prevEnvStats;
+
+    /**
+     * The number of bytes in a megabyte. Note that 1024*1024 bytes may
+     * eventually become known as a mebibyte(MiB).
+     */
+    public static final int bytesPerMegabyte = 1024 * 1024;
+
+    // Determines if eviction has been detected.
+    private boolean evicting = false;
+
+    // Suspend output.
+    private boolean pause = false;
+
+    private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap;
+
+
+    /**
+     * Create a new import progress task.
+     * @param containerIndexMgrMap Map of database container objects to
+     *                             index manager objects.
+     */
+    public SecondPhaseProgressTask(Map<DatabaseContainer,
+            IndexManager> containerIndexMgrMap)
+    {
+      previousTime = System.currentTimeMillis();
+      this.containerIndexMgrMap = containerIndexMgrMap;
+      try
+      {
+        prevEnvStats =
+                rootContainer.getEnvironmentStats(new StatsConfig());
+      }
+      catch (DatabaseException e)
+      {
+        throw new RuntimeException(e);
+      }
+    }
+
+
+    /**
+     * The action to be performed by this timer task.
+     */
+    @Override
+    public void run()
+    {
+      long latestCount = reader.getEntriesRead() + 0;
+      long deltaCount = (latestCount - previousCount);
+      long latestTime = System.currentTimeMillis();
+      long deltaTime = latestTime - previousTime;
+      Message message;
+      if (deltaTime == 0)
+      {
+        return;
+      }
+      if (pause)
+      {
+        return;
+      }
+
+      try
+      {
+        Runtime runtime = Runtime.getRuntime();
+        long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
+        EnvironmentStats envStats =
+                rootContainer.getEnvironmentStats(new StatsConfig());
+        long nCacheMiss =
+                envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+
+        float cacheMissRate = 0;
+        if (deltaCount > 0)
+        {
+          cacheMissRate = nCacheMiss / (float) deltaCount;
+        }
+        message =
+                NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
+                        cacheMissRate);
+        logError(message);
+        long evictPasses = envStats.getNEvictPasses();
+        long evictNodes = envStats.getNNodesExplicitlyEvicted();
+        long evictBinsStrip = envStats.getNBINsStripped();
+        long cleanerRuns = envStats.getNCleanerRuns();
+        long cleanerDeletions = envStats.getNCleanerDeletions();
+        long cleanerEntriesRead = envStats.getNCleanerEntriesRead();
+        long cleanerINCleaned = envStats.getNINsCleaned();
+        long checkPoints = envStats.getNCheckpoints();
+        if (evictPasses != 0)
+        {
+          if (!evicting)
+          {
+            evicting = true;
+          }
+          message =
+                  NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
+                          evictPasses, evictNodes, evictBinsStrip);
+          logError(message);
+        }
+        if (cleanerRuns != 0)
+        {
+          message =
+                  NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
+                          cleanerDeletions, cleanerEntriesRead,
+                          cleanerINCleaned);
+          logError(message);
+        }
+        if (checkPoints > 1)
+        {
+          message =
+                  NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
+          logError(message);
+        }
+        prevEnvStats = envStats;
+      }
+      catch (DatabaseException e)
+      {
+        // Unlikely to happen and not critical.
+      }
+      previousCount = latestCount;
+      previousTime = latestTime;
+
+      for(Map.Entry<DatabaseContainer, IndexManager> e :
+              containerIndexMgrMap.entrySet())
+      {
+        IndexManager indexMgr = e.getValue();
+        indexMgr.printStats(deltaTime);
+      }
+    }
+  }
+}

--
Gitblit v1.10.0