mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

dugan
25.32.2009 328ec50e683c622586d30aeb9dee55bebdebfe0c
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -27,954 +27,1756 @@
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.types.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.JebMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
import java.io.*;
import java.nio.*;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import org.opends.messages.Message;
import org.opends.messages.Category;
import org.opends.messages.Severity;
import org.opends.server.admin.std.server.LocalDBBackendCfg;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.LDIFException;
import org.opends.server.util.RuntimeInformation;
import static org.opends.server.util.DynamicConstants.BUILD_ID;
import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
import org.opends.server.backends.jeb.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.backends.jeb.*;
import org.opends.messages.Message;
import org.opends.messages.JebMessages;
import static org.opends.messages.JebMessages.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.io.IOException;
import org.opends.server.types.*;
import org.opends.server.util.*;
import com.sleepycat.je.*;
/**
 * Performs a LDIF import.
 */
public class Importer
{
  private final int DRAIN_TO = 3;
  private final int TIMER_INTERVAL = 10000;
  private final int MB =  (1024 * 1024);
  private final int LDIF_READER_BUF_SIZE = 2 * MB;
  private final int MIN_IMPORT_MEM_REQUIRED = 16 * MB;
  private final int MAX_BUFFER_SIZE = 48 * MB;
  private final int MIN_BUFFER_SIZE = 1024 * 100;
  private final int MIN_READ_AHEAD_CACHE_SIZE = 4096;
  private final int MAX_DB_CACHE_SIZE = 128 * MB;
  private final int MIN_DB_CACHE_SIZE = 16 * MB;
  private final int MAX_DB_LOG_BUF_BYTES = 100 * MB;
  private final int MEM_PCT_PHASE_1 = 60;
  private final int MEM_PCT_PHASE_2 = 50;
public class Importer implements Thread.UncaughtExceptionHandler {
  private final String DIRECT_PROPERTY = "import.directphase2";
  private final AtomicInteger bufferCount = new AtomicInteger(0);
  private final File tempDir;
  private final int indexCount, threadCount;
  private final boolean dn2idPhase2;
  private final LDIFImportConfig config;
  private final ByteBuffer directBuffer;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The JE backend configuration.
   */
  private LocalDBBackendCfg config;
  /**
   * The root container used for this import job.
   */
  private RootContainer rootContainer;
  /**
   * The LDIF import configuration.
   */
  private LDIFImportConfig ldifImportConfig;
  /**
   * The LDIF reader.
   */
  private LDIFReader reader;
  /**
   * Map of base DNs to their import context.
   */
  private LinkedHashMap<DN, DNContext> importMap =
      new LinkedHashMap<DN, DNContext>();
  private int bufferSize;
  private long dbCacheSize = 0, dbLogBufSize = 0;
  /**
    * The number of entries migrated.
    */
   private int migratedCount;
  //The executor service used for the sort tasks.
  private ExecutorService sortService;
  /**
   * The number of entries imported.
   */
  private int importedCount;
  //The executor service used for the index processing tasks.
  private ExecutorService indexProcessService;
  /**
   * The number of milliseconds between job progress reports.
   */
  private long progressInterval = 10000;
  //Queue of free index buffers -- used to re-cycle index buffers;
  private final BlockingQueue<IndexBuffer> freeBufQue =
          new LinkedBlockingQueue<IndexBuffer>();
  /**
   * The progress report timer.
   */
  private Timer timer;
  //Map of DB containers to que of index buffers.  Used to allocate sorted
  //index buffers to a index writer thread.
  private final
  Map<DatabaseContainer, BlockingQueue<IndexBuffer>> containerQueMap =
          new LinkedHashMap<DatabaseContainer, BlockingQueue<IndexBuffer>>();
  //Thread array.
  private CopyOnWriteArrayList<WorkThread> threads;
  //Map of DB containers to index managers. Used to start phase 2.
  private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap =
          new LinkedHashMap<DatabaseContainer, IndexManager>();
  //Progress task.
  private ProgressTask pTask;
  //Futures used to indicate when the index file writers are done flushing
  //their work queues and have exited. End of phase one.
  private final List<Future<?>> indexWriterFutures;
  //Number of entries import before checking if cleaning is needed after
  //eviction has been detected.
  private static final int entryCleanInterval = 250000;
  //List of index file writer tasks. Used to signal stopIndexWriterTasks to the
  //index file writer tasks when the LDIF file has been done.
  private final List<IndexFileWriterTask> indexWriterList;
  //Minimum buffer amount to give to a buffer manager.
  private static final long minBuffer = 1024 * 1024;
  //Total available memory for the buffer managers.
  private long totalAvailBufferMemory = 0;
  //Memory size to be used for the DB cache in string format.
  private String dbCacheSizeStr;
  //Used to do an initial clean after eviction has been detected.
  private boolean firstClean=false;
  //A thread threw an Runtime exception stop the import.
  private boolean unCaughtExceptionThrown = false;
  //Set to true if substring indexes are defined.
  private boolean hasSubIndexes = false;
  //Work thread 0, used to add the first 20 or so entries single threaded.
  private WorkThread workThread0;
  //Counter for thread 0;
  private int worker0Proc=0;
  //Max thread 0 adds.
  private static final int maxWorker0 = 20;
  //Map of DNs to Suffix objects. Placeholder for when multiple suffixes are
  //supported.
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
  /**
   * Create a new import job with the specified ldif import config.
   *
   * @param ldifImportConfig The LDIF import config.
   * @param hasSubIndexes <CODE>True</CODE> If substring indexes are defined.
   * @param config The LDIF import config.
   * @param cfg The local DB backend config.
   * @throws IOException  If a problem occurs while opening the LDIF file for
   *                      reading.
   */
  public Importer(LDIFImportConfig ldifImportConfig, boolean hasSubIndexes)
  public Importer(LDIFImportConfig config,
                  LocalDBBackendCfg cfg )
          throws IOException
  {
    this.ldifImportConfig = ldifImportConfig;
    this.threads = new CopyOnWriteArrayList<WorkThread>();
    this.hasSubIndexes = hasSubIndexes;
    calcMemoryLimits();
    this.config = config;
    threadCount = cfg.getImportThreadCount();
    indexCount = cfg.listLocalDBIndexes().length + 2;
    indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount);
    indexWriterFutures = new CopyOnWriteArrayList<Future<?>>();
    File parentDir;
    if(config.getTmpDirectory() == null)
    {
      parentDir = getFileForPath("import-tmp");
    }
    else
    {
       parentDir = getFileForPath(config.getTmpDirectory());
    }
    tempDir = new File(parentDir, cfg.getBackendId());
    if(!tempDir.exists() && !tempDir.mkdirs())
    {
      Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(
              String.valueOf(tempDir));
      throw new IOException(msg.toString());
    }
    if (tempDir.listFiles() != null)
    {
      for (File f : tempDir.listFiles())
      {
        f.delete();
      }
    }
    dn2idPhase2 = config.getDNCheckPhase2();
    String propString = System.getProperty(DIRECT_PROPERTY);
    if(propString != null)
    {
      int directSize = Integer.valueOf(propString);
      directBuffer = ByteBuffer.allocateDirect(directSize);
    }
    else
    {
     directBuffer = null;
    }
  }
  private void getBufferSizes(long availMem, int buffers)
  {
    long mem = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_BUF_BYTES);
    bufferSize = (int) (mem/buffers);
    if(bufferSize >= MIN_BUFFER_SIZE)
    {
      dbCacheSize =  MAX_DB_CACHE_SIZE;
      dbLogBufSize = MAX_DB_LOG_BUF_BYTES;
      if(bufferSize > MAX_BUFFER_SIZE)
      {
        bufferSize = MAX_BUFFER_SIZE;
      }
    }
    else
    {
      mem = availMem - MIN_DB_CACHE_SIZE - (MIN_DB_CACHE_SIZE * 7) / 100;
      bufferSize = (int) (mem/buffers);
      dbCacheSize =  MIN_DB_CACHE_SIZE;
      if(bufferSize < MIN_BUFFER_SIZE)
      {
        System.out.println("Log size less than default -- give it a try");
        bufferSize = MIN_BUFFER_SIZE;
      }
      else
      {
        long constrainedMem = mem - (buffers * MIN_BUFFER_SIZE);
        bufferSize = (int) ((buffers * MIN_BUFFER_SIZE) +
                            (constrainedMem * 50/100));
        bufferSize /= buffers;
        dbCacheSize = MIN_DB_CACHE_SIZE + (constrainedMem * 50/100);
      }
    }
  }
  /**
   * Return the suffix instance in the specified map that matches the specified
   * DN.
   *
   * @param dn The DN to search for.
   * @param map The map to search.
   * @return The suffix instance that matches the DN, or null if no match is
   *         found.
   */
  public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
  {
    Suffix suffix = null;
    DN nodeDN = dn;
    while (suffix == null && nodeDN != null) {
      suffix = map.get(nodeDN);
      if (suffix == null)
      {
        nodeDN = nodeDN.getParentDNInSuffix();
      }
    }
    return suffix;
  }
  /**
   * Start the worker threads.
   * Calculate buffer sizes and initialize JEB properties based on memory.
   *
   * @throws DatabaseException If a DB problem occurs.
   * @param envConfig The environment config to use in the calculations.
   *
   * @throws InitializationException If a problem occurs during calculation.
   */
  private void startWorkerThreads()
          throws DatabaseException {
    int importThreadCount = config.getImportThreadCount();
    //Figure out how much buffer memory to give to each context.
    int contextCount = importMap.size();
    long memoryPerContext = totalAvailBufferMemory / contextCount;
    //Below min, use the min value.
    if(memoryPerContext < minBuffer) {
      Message msg =
            NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
                                                             minBuffer);
  public void init(EnvironmentConfig envConfig)
          throws InitializationException
  {
    Message msg;
    Runtime runtime = Runtime.getRuntime();
    long freeMemory = runtime.freeMemory();
    long availMemImport = (freeMemory * MEM_PCT_PHASE_1) / 100;
    int phaseOneBuffers = 2 * (indexCount * threadCount);
    msg = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availMemImport, phaseOneBuffers);
    logError(msg);
    if (availMemImport < MIN_IMPORT_MEM_REQUIRED)
    {
      msg = ERR_IMPORT_LDIF_LACK_MEM.get(16);
      throw new InitializationException(msg);
    }
    getBufferSizes(availMemImport, phaseOneBuffers);
    envConfig.setConfigParam("je.maxMemory", Long.toString(dbCacheSize));
    msg = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize);
    logError(msg);
    if(dbLogBufSize != 0)
    {
      envConfig.setConfigParam("je.log.totalBufferBytes",
              Long.toString(dbLogBufSize));
      msg = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(dbLogBufSize);
      logError(msg);
      memoryPerContext = minBuffer;
    }
    // Create one set of worker threads/buffer managers for each base DN.
    for (DNContext context : importMap.values()) {
      BufferManager bufferManager =
                        new BufferManager(memoryPerContext);
      context.setBufferManager(bufferManager);
      for (int i = 0; i < importThreadCount; i++) {
        WorkThread t = new WorkThread(context.getWorkQueue(), i,
                bufferManager, rootContainer, importMap);
        t.setUncaughtExceptionHandler(this);
        threads.add(t);
        if(i == 0) {
          workThread0 = t;
        }
        t.start();
      }
    }
    // Start a timer for the progress report.
    timer = new Timer();
    TimerTask progressTask = new ProgressTask();
    //Used to get at extra functionality such as eviction detected.
    pTask = (ProgressTask) progressTask;
    timer.scheduleAtFixedRate(progressTask, progressInterval,
                              progressInterval);
    return;
  }
  private void initIndexBuffers(int threadCount)
  {
    int bufferCount = 2 * (indexCount * threadCount);
    for(int i = 0; i < bufferCount; i++)
    {
      IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize);
      freeBufQue.add(b);
    }
  }
  private void initSuffixes()
          throws ConfigException, InitializationException
  {
    Iterator<EntryContainer> i = rootContainer.getEntryContainers().iterator();
    EntryContainer ec = i.next();
    Suffix suffix = Suffix.createSuffixContext(ec, config, rootContainer);
    dnSuffixMap.put(ec.getBaseDN(), suffix);
  }
  /**
   * Import a ldif using the specified root container.
   *
   * @param rootContainer  The root container.
   * @param rootContainer The root container to use during the import.
   *
   * @return A LDIF result.
   * @throws DatabaseException  If a DB error occurs.
   * @throws IOException If a IO error occurs.
   * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
   * @throws DirectoryException If a directory error occurs.
   * @throws ConfigException If a configuration has an error.
   * @throws ConfigException If the import failed because of an configuration
   *                         error.
   * @throws IOException If the import failed because of an IO error.
   * @throws InitializationException If the import failed because of an
   *               initialization error.
   * @throws JebException If the import failed due to a database error.
   * @throws InterruptedException If the import failed due to an interrupted
   *                              error.
   * @throws ExecutionException If the import failed due to an execution error.
   */
  public LDIFImportResult processImport(RootContainer rootContainer)
      throws DatabaseException, IOException, JebException, DirectoryException,
            ConfigException {
    // Create an LDIF reader. Throws an exception if the file does not exist.
    reader = new LDIFReader(ldifImportConfig);
  public LDIFImportResult
  processImport(RootContainer rootContainer) throws ConfigException,
          InitializationException, IOException, JebException,
          InterruptedException, ExecutionException
  {
    this.rootContainer = rootContainer;
    this.config = rootContainer.getConfiguration();
    Message message;
    long startTime;
    try {
      int importThreadCount = config.getImportThreadCount();
      message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
                                                     BUILD_ID, REVISION_NUMBER);
      logError(message);
      message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
      logError(message);
      RuntimeInformation.logInfo();
      for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
        DNContext DNContext =  getImportContext(entryContainer);
        if(DNContext != null) {
          importMap.put(entryContainer.getBaseDN(), DNContext);
        }
      }
      // Make a note of the time we started.
      startTime = System.currentTimeMillis();
      startWorkerThreads();
      try {
        importedCount = 0;
        migratedCount = 0;
        migrateExistingEntries();
        processLDIF();
        migrateExcludedEntries();
      } finally {
        if(!unCaughtExceptionThrown) {
          cleanUp();
          switchContainers();
        }
      }
    }
    finally {
      reader.close();
    }
    importProlog(startTime);
    return new LDIFImportResult(reader.getEntriesRead(),
                                reader.getEntriesRejected(),
                                reader.getEntriesIgnored());
  }
  /**
   * Switch containers if the migrated entries were written to the temporary
   * container.
   *
   * @throws DatabaseException If a DB problem occurs.
   * @throws JebException If a JEB problem occurs.
   */
  private void switchContainers() throws DatabaseException, JebException {
    for(DNContext importContext : importMap.values()) {
      DN baseDN = importContext.getBaseDN();
      EntryContainer srcEntryContainer =
              importContext.getSrcEntryContainer();
      if(srcEntryContainer != null) {
        if (debugEnabled()) {
          TRACER.debugInfo("Deleteing old entry container for base DN " +
                  "%s and renaming temp entry container", baseDN);
        }
        EntryContainer unregEC =
                rootContainer.unregisterEntryContainer(baseDN);
        //Make sure the unregistered EC for the base DN is the same as
        //the one in the import context.
        if(unregEC != srcEntryContainer) {
          if(debugEnabled()) {
            TRACER.debugInfo("Current entry container used for base DN " +
                    "%s is not the same as the source entry container used " +
                    "during the migration process.", baseDN);
          }
          rootContainer.registerEntryContainer(baseDN, unregEC);
          continue;
        }
        srcEntryContainer.lock();
        srcEntryContainer.close();
        srcEntryContainer.delete();
        srcEntryContainer.unlock();
        EntryContainer newEC = importContext.getEntryContainer();
        newEC.lock();
        newEC.setDatabasePrefix(baseDN.toNormalizedString());
        newEC.unlock();
        rootContainer.registerEntryContainer(baseDN, newEC);
      }
    }
  }
  /**
   * Create and log messages at the end of the successful import.
   *
   * @param startTime The time the import started.
   */
  private void importProlog(long startTime) {
    Message message;
    this.reader = new LDIFReader(config, rootContainer, LDIF_READER_BUF_SIZE);
    Message message =
            NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
                    BUILD_ID, REVISION_NUMBER);
    logError(message);
    message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount);
    logError(message);
    RuntimeInformation.logInfo();
    initSuffixes();
    long startTime = System.currentTimeMillis();
    processPhaseOne();
    processPhaseTwo();
    setIndexesTrusted();
    tempDir.delete();
    long finishTime = System.currentTimeMillis();
    long importTime = (finishTime - startTime);
    float rate = 0;
    if (importTime > 0)
      rate = 1000f * reader.getEntriesRead() / importTime;
    message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
            reader.getEntriesRead(), reader.getEntriesIgnored(), reader
                    .getEntriesRejected(), 0, importTime / 1000, rate);
    logError(message);
    return new LDIFImportResult(reader.getEntriesRead(), reader
            .getEntriesRejected(), reader.getEntriesIgnored());
  }
  private void setIndexesTrusted() throws JebException
  {
    try {
      for(Suffix s : dnSuffixMap.values()) {
        s.setIndexesTrusted();
      }
    }
    catch (DatabaseException ex)
    {
      rate = 1000f*importedCount / importTime;
      Message msg = NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
      throw new JebException(msg);
    }
  }
    message = NOTE_JEB_IMPORT_FINAL_STATUS.
        get(reader.getEntriesRead(), importedCount,
            reader.getEntriesIgnored(), reader.getEntriesRejected(),
            migratedCount, importTime/1000, rate);
    logError(message);
    message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
        getEntryLimitExceededCount());
    logError(message);
  private void processPhaseOne() throws InterruptedException, ExecutionException
  {
    initIndexBuffers(threadCount);
    FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
    Timer timer = new Timer();
    timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
    indexProcessService = Executors.newFixedThreadPool(2 * indexCount);
    sortService = Executors.newFixedThreadPool(threadCount);
    //Import tasks are collective tasks.
    List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    for (int i = 0; i < threadCount; i++)
    {
      tasks.add(new ImportTask());
    }
    ExecutorService execService = Executors.newFixedThreadPool(threadCount);
      List<Future<Void>> results = execService.invokeAll(tasks);
      for (Future<Void> result : results)
        assert result.isDone();
    stopIndexWriterTasks();
    for (Future<?> result : indexWriterFutures)
    {
        result.get();
    }
    execService.shutdown();
    freeBufQue.clear();
    sortService.shutdown();
    timer.cancel();
  }
  private void processPhaseTwo() throws InterruptedException
  {
    SecondPhaseProgressTask progress2Task =
            new SecondPhaseProgressTask(containerIndexMgrMap);
    Timer timer2 = new Timer();
    timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL);
    processIndexFiles();
    timer2.cancel();
  }
  private void processIndexFiles() throws InterruptedException
  {
    List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(indexCount);
    if(bufferCount.get() == 0)
    {
      return;
    }
    int cacheSize =  cacheSizeFromFreeMemory();
    int p = 0;
    int offSet = 0;
    if(directBuffer != null)
    {
      cacheSize = cacheSizeFromDirectMemory();
    }
    for(Map.Entry<DatabaseContainer, IndexManager> e :
            containerIndexMgrMap.entrySet())
    {
      DatabaseContainer container = e.getKey();
      IndexManager indexMgr = e.getValue();
      boolean isDN2ID = false;
      if(container instanceof DN2ID)
      {
        isDN2ID = true;
      }
      if(directBuffer != null)
      {
        int cacheSizes = cacheSize * indexMgr.getBufferList().size();
        offSet += cacheSizes;
        directBuffer.limit(offSet);
        directBuffer.position(p);
        ByteBuffer b = directBuffer.slice();
        tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, b, cacheSize));
        p += cacheSizes;
      }
      else
      {
        tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, cacheSize));
      }
    }
    List<Future<Void>> results = indexProcessService.invokeAll(tasks);
    for (Future<Void> result : results)
      assert result.isDone();
    indexProcessService.shutdown();
  }
  private int cacheSizeFromDirectMemory()
  {
    int cap = directBuffer.capacity();
    int cacheSize = cap/bufferCount.get();
    if(cacheSize > bufferSize)
    {
      cacheSize = bufferSize;
    }
    System.out.println("Direct indexes begin Total bufferCount: " +
            bufferCount.get() + " cacheSize: " + cacheSize);
    return cacheSize;
  }
  private int cacheSizeFromFreeMemory()
  {
    Runtime runtime = Runtime.getRuntime();
    long availMemory = runtime.freeMemory()  * MEM_PCT_PHASE_2 / 100;
    int avgBufSize = (int)(availMemory / bufferCount.get());
    int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, avgBufSize);
    if(cacheSize > bufferSize)
    {
      cacheSize = bufferSize;
    }
    System.out.println("Indirect indexes begin Total bufferCount: " +
            bufferCount.get() + " avgBufSize: "
            + avgBufSize + " cacheSize: " + cacheSize);
    return cacheSize;
  }
  private void stopIndexWriterTasks()
  {
    IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0);
    for(IndexFileWriterTask task : indexWriterList)
    {
      task.que.add(idxBuffer);
    }
  }
  /**
   * Run the cleaner if it is needed.
   *
   * @param entriesRead The number of entries read so far.
   * @param evictEntryNumber The number of entries to run the cleaner after
   * being read.
   * @throws DatabaseException If a DB problem occurs.
   * This task processes the LDIF file during phase 1.
   */
  private void
  runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
          throws DatabaseException {
    if(!firstClean || (entriesRead %  evictEntryNumber) == 0) {
      //Make sure work queue is empty before starting.
      drainWorkQueue();
      Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get();
      runCleaner(msg);
      if(!firstClean) {
        firstClean=true;
      }
    }
  }
  private final class ImportTask implements Callable<Void> {
    private final Map<Suffix, Map<DatabaseContainer, IndexBuffer>> suffixMap =
            new HashMap<Suffix, Map<DatabaseContainer, IndexBuffer>>();
    private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
    private final IndexBuffer.DNComparator dnComparator
            = new IndexBuffer.DNComparator();
    private final IndexBuffer.IndexComparator indexComparator =
            new IndexBuffer.IndexComparator();
  /**
   * Run the cleaner, pausing the task thread output.
   *
   * @param header Message to be printed before cleaning.
   * @throws DatabaseException If a DB problem occurs.
   */
  private void runCleaner(Message header) throws DatabaseException {
    Message msg;
    long startTime = System.currentTimeMillis();
    //Need to force a checkpoint.
    rootContainer.importForceCheckPoint();
    logError(header);
    pTask.setPause(true);
    //Actually clean the files.
    int cleaned = rootContainer.cleanedLogFiles();
    //This checkpoint removes the files if any were cleaned.
    if(cleaned > 0) {
      msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
      logError(msg);
      rootContainer.importForceCheckPoint();
    }
    pTask.setPause(false);
    long finishTime = System.currentTimeMillis();
    long cleanTime = (finishTime - startTime) / 1000;
    msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
    logError(msg);
  }
  /**
   * Process a LDIF reader.
   *
   * @throws JebException If a JEB problem occurs.
   * @throws DatabaseException If a DB problem occurs.
   * @throws IOException If an IO exception occurs.
   */
  private void
  processLDIF() throws JebException, DatabaseException, IOException {
    Message message = NOTE_JEB_IMPORT_LDIF_START.get();
    logError(message);
    do {
      if (ldifImportConfig.isCancelled()) {
        break;
      }
      if(threads.size() <= 0) {
        message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
        throw new JebException(message);
      }
      if(unCaughtExceptionThrown) {
        abortImport();
      }
      try {
        // Read the next entry.
        Entry entry = reader.readEntry();
        // Check for end of file.
        if (entry == null) {
          message = NOTE_JEB_IMPORT_LDIF_END.get();
          logError(message);
    /**
     * {@inheritDoc}
     */
    public Void call() throws Exception
    {
      Suffix suffix = null;
      while (true)
      {
        if (config.isCancelled())
        {
          IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0);
          freeBufQue.add(idxBuffer);
          return null;
        }
        Entry entry = reader.readEntry(dnSuffixMap);
        if (entry == null)
        {
          break;
        }
        // Route it according to base DN.
        DNContext DNContext = getImportConfig(entry.getDN());
        processEntry(DNContext, entry);
        //If the progress task has noticed eviction proceeding, start running
        //the cleaner.
        if(pTask.isEvicting()) {
          runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
        }
      }  catch (LDIFException e) {
        if (debugEnabled()) {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      } catch (DirectoryException e) {
        if (debugEnabled()) {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      } catch (DatabaseException e)  {
        if (debugEnabled()) {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    } while (true);
  }
  /**
   * Process an entry using the specified import context.
   *
   * @param DNContext The import context.
   * @param entry The entry to process.
   */
  private void processEntry(DNContext DNContext, Entry entry)
               throws DirectoryException, DatabaseException, JebException {
    if(worker0Proc < maxWorker0) {
       DNContext.addPending(entry.getDN());
       WorkElement element =
            WorkElement.decode(entry, DNContext);
        workThread0.process(element);
        worker0Proc++;
    } else {
      //Add this DN to the pending map.
      DNContext.addPending(entry.getDN());
      addEntryQueue(DNContext, entry);
    }
  }
  /**
   * Add work item to specified import context's queue.
   * @param context The import context.
   * @param item The work item to add.
   * @return <CODE>True</CODE> if the the work  item was added to the queue.
   */
  private boolean
  addQueue(DNContext context, WorkElement item) {
    try {
      while(!context.getWorkQueue().offer(item, 1000,
                                            TimeUnit.MILLISECONDS)) {
        if(threads.size() <= 0) {
          // All worker threads died. We must stop now.
          return false;
        }
      }
    } catch (InterruptedException e) {
      if (debugEnabled()) {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    return true;
  }
  /**
   * Wait until the work queue is empty.
   */
  private void drainWorkQueue() {
    if(threads.size() > 0) {
      for (DNContext context : importMap.values()) {
        while (context.getWorkQueue().size() > 0) {
          try {
            Thread.sleep(100);
          } catch (Exception e) {
            // No action needed.
          }
        }
      }
    }
  }
  private void abortImport() throws JebException {
    //Stop work threads telling them to skip substring flush.
     stopWorkThreads(false);
     timer.cancel();
     Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
     throw new JebException(message);
  }
  /**
   * Stop work threads.
   *
   * @param abort <CODE>True</CODE> if stop work threads was called from an
   *              abort.
   * @throws JebException if a Jeb error occurs.
   */
  private void
  stopWorkThreads(boolean abort) throws JebException {
    for (WorkThread t : threads) {
      t.stopProcessing();
    }
    // Wait for each thread to stop.
    for (WorkThread t : threads) {
      try {
        if(!abort && unCaughtExceptionThrown) {
          timer.cancel();
          Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
          throw new JebException(message);
        }
        t.join();
        importedCount += t.getImportedCount();
      } catch (InterruptedException ie) {
        // No action needed?
      }
    }
  }
  /**
   * Clean up after a successful import.
   *
   * @throws DatabaseException If a DB error occurs.
   * @throws JebException If a Jeb error occurs.
   */
  private void cleanUp() throws DatabaseException, JebException {
     Message msg;
    //Drain the work queue.
    drainWorkQueue();
    pTask.setPause(true);
    long startTime = System.currentTimeMillis();
    stopWorkThreads(true);
    //Flush the buffer managers.
    for(DNContext context : importMap.values()) {
      context.getBufferManager().prepareFlush();
      context.getBufferManager().flushAll();
    }
    long finishTime = System.currentTimeMillis();
    long flushTime = (finishTime - startTime) / 1000;
     msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
    logError(msg);
    timer.cancel();
    for(DNContext context : importMap.values()) {
      context.setIndexesTrusted();
    }
    msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
    //Run the cleaner.
    runCleaner(msg);
    closeIndexCursors();
  }
   private void closeIndexCursors() throws DatabaseException {
    for (DNContext ic : importMap.values())
    {
      ic.getEntryContainer().closeIndexCursors();
    }
  }
  /**
   * Uncaught exception handler.
   *
   * @param t The thread working when the exception was thrown.
   * @param e The exception.
   */
  public void uncaughtException(Thread t, Throwable e) {
     unCaughtExceptionThrown = true;
     threads.remove(t);
     Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
         t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
     logError(msg);
   }
  /**
   * Get the entry limit exceeded counts from the indexes.
   *
   * @return Count of the index with entry limit exceeded values.
   */
  private int getEntryLimitExceededCount() {
    int count = 0;
    for (DNContext ic : importMap.values())
    {
      count += ic.getEntryContainer().getEntryLimitExceededCount();
    }
    return count;
  }
  /**
   * Return an import context related to the specified DN.
   * @param dn The dn.
   * @return  An import context.
   * @throws DirectoryException If an directory error occurs.
   */
  private DNContext getImportConfig(DN dn) throws DirectoryException {
    DNContext DNContext = null;
    DN nodeDN = dn;
    while (DNContext == null && nodeDN != null) {
      DNContext = importMap.get(nodeDN);
      if (DNContext == null)
      {
        nodeDN = nodeDN.getParentDNInSuffix();
      }
    }
    if (nodeDN == null) {
      // The entry should not have been given to this backend.
      Message message =
              JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
      throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
    }
    return DNContext;
  }
  /**
   * Creates an import context for the specified entry container.
   *
   * @param entryContainer The entry container.
   * @return Import context to use during import.
   * @throws DatabaseException If a database error occurs.
   * @throws JebException If a JEB error occurs.
   * @throws ConfigException If a configuration contains error.
   */
   private DNContext getImportContext(EntryContainer entryContainer)
      throws DatabaseException, JebException, ConfigException {
    DN baseDN = entryContainer.getBaseDN();
    EntryContainer srcEntryContainer = null;
    List<DN> includeBranches = new ArrayList<DN>();
    List<DN> excludeBranches = new ArrayList<DN>();
    if(!ldifImportConfig.appendToExistingData() &&
        !ldifImportConfig.clearBackend())
    {
      for(DN dn : ldifImportConfig.getExcludeBranches())
      {
        if(baseDN.equals(dn))
        DN entryDN = entry.getDN();
        EntryID entryID = (EntryID) entry.getAttachment();
        suffix = getMatchSuffix(entryDN, dnSuffixMap);
        if(!suffixMap.containsKey(suffix))
        {
          // This entire base DN was explicitly excluded. Skip.
          return null;
          suffixMap.put(suffix, new HashMap<DatabaseContainer, IndexBuffer>());
        }
        if(baseDN.isAncestorOf(dn))
        if(!dn2idPhase2)
        {
          excludeBranches.add(dn);
        }
      }
      if(!ldifImportConfig.getIncludeBranches().isEmpty())
      {
        for(DN dn : ldifImportConfig.getIncludeBranches())
        {
          if(baseDN.isAncestorOf(dn))
          if(!processParent(entryDN, entryID, entry, suffix))
          {
            includeBranches.add(dn);
            suffix.removePending(entryDN);
            continue;
          }
        }
        if(includeBranches.isEmpty())
        {
          // There are no branches in the explicitly defined include list under
          // this base DN. Skip this base DN alltogether.
          return null;
        }
        // Remove any overlapping include branches.
        Iterator<DN> includeBranchIterator = includeBranches.iterator();
        while(includeBranchIterator.hasNext())
        {
          DN includeDN = includeBranchIterator.next();
          boolean keep = true;
          for(DN dn : includeBranches)
          if(!suffix.getDN2ID().insert(null, entryDN, entryID))
          {
            if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
            {
              keep = false;
              break;
            }
            suffix.removePending(entryDN);
             Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
             reader.rejectEntry(entry, msg);
            continue;
          }
          if(!keep)
          {
            includeBranchIterator.remove();
          }
        }
        // Remvoe any exclude branches that are not are not under a include
        // branch since they will be migrated as part of the existing entries
        // outside of the include branches anyways.
        Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
        while(excludeBranchIterator.hasNext())
        {
          DN excludeDN = excludeBranchIterator.next();
          boolean keep = false;
          for(DN includeDN : includeBranches)
          {
            if(includeDN.isAncestorOf(excludeDN))
            {
              keep = true;
              break;
            }
          }
          if(!keep)
          {
            excludeBranchIterator.remove();
          }
        }
        if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
            includeBranches.get(0).equals(baseDN))
        {
          // This entire base DN is explicitly included in the import with
          // no exclude branches that we need to migrate. Just clear the entry
          // container.
          entryContainer.lock();
          entryContainer.clear();
          entryContainer.unlock();
          suffix.removePending(entryDN);
          processID2SC(entryID, entry, suffix);
        }
        else
        {
          // Create a temp entry container
          srcEntryContainer = entryContainer;
          entryContainer =
              rootContainer.openEntryContainer(baseDN,
                                               baseDN.toNormalizedString() +
                                                   "_importTmp");
          processDN2ID(suffix, entryDN, entryID);
          suffix.removePending(entryDN);
        }
        suffix.getID2Entry().put(null, entryID, entry);
        processIndexes(suffix, entry, entryID);
      }
      flushIndexBuffers();
      if(!dn2idPhase2)
      {
        suffix.getEntryContainer().getID2Children().closeCursor();
        suffix.getEntryContainer().getID2Subtree().closeCursor();
      }
      return null;
    }
    private boolean processParent(DN entryDN, EntryID entryID, Entry entry,
                                  Suffix suffix) throws DatabaseException
    {
      EntryID parentID = null;
      DN parentDN =
              suffix.getEntryContainer().getParentWithinBase(entryDN);
      DN2ID dn2id = suffix.getDN2ID();
      if(dn2id.get(null, entryDN, LockMode.DEFAULT) != null)
      {
        Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
        reader.rejectEntry(entry, msg);
        return false;
      }
      if (parentDN != null) {
        parentID = suffix.getParentID(parentDN);
        if (parentID == null) {
          dn2id.remove(null, entryDN);
          Message msg =
                      ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
           reader.rejectEntry(entry, msg);
          return false;
        }
      }
    }
    // Create an import context.
    DNContext DNContext = new DNContext();
    DNContext.setConfig(config);
    DNContext.setLDIFImportConfig(this.ldifImportConfig);
    DNContext.setLDIFReader(reader);
    DNContext.setBaseDN(baseDN);
    DNContext.setEntryContainer(entryContainer);
    DNContext.setSrcEntryContainer(srcEntryContainer);
    //Create queue.
    LinkedBlockingQueue<WorkElement> works =
        new LinkedBlockingQueue<WorkElement>
                     (config.getImportQueueSize());
    DNContext.setWorkQueue(works);
    // Set the include and exclude branches
    DNContext.setIncludeBranches(includeBranches);
    DNContext.setExcludeBranches(excludeBranches);
    return DNContext;
  }
  /**
   * Add specified context and entry to the work queue.
   *
   * @param context The context related to the entry DN.
   * @param entry The entry to work on.
   * @return  <CODE>True</CODE> if the element was added to the work queue.
   */
  private boolean
  addEntryQueue(DNContext context,  Entry entry) {
    WorkElement element =
            WorkElement.decode(entry, context);
    return addQueue(context, element);
  }
  /**
   * Calculate the memory usage for the substring buffer and the DB cache.
   */
  private void calcMemoryLimits() {
    Message msg;
    Runtime runtime = Runtime.getRuntime();
    long freeMemory = runtime.freeMemory();
    long maxMemory = runtime.maxMemory();
    long totMemory = runtime.totalMemory();
    long totFreeMemory = (freeMemory + (maxMemory - totMemory));
    long dbCacheLimit = (totFreeMemory * 60) / 100;
    //If there are no substring indexes defined, set the DB cache
    //size to 75% and take a minimal substring buffer.
    if(!hasSubIndexes) {
      dbCacheLimit = (totFreeMemory * 75) / 100;
    }
    dbCacheSizeStr = Long.toString(dbCacheLimit);
    totalAvailBufferMemory = (totFreeMemory * 10) / 100;
    if(totalAvailBufferMemory < (10 * minBuffer)) {
       msg =
          NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
                                                      (10 * minBuffer));
      logError(msg);
      totalAvailBufferMemory = (10 * minBuffer);
    } else if(!hasSubIndexes) {
      totalAvailBufferMemory = (10 * minBuffer);
    }
    msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
                                             totalAvailBufferMemory);
    logError(msg);
  }
  /**
   * Return the string representation of the DB cache size.
   *
   * @return DB cache size string.
   */
  public String getDBCacheSize() {
    return dbCacheSizeStr;
  }
  /**
   * Migrate any existing entries.
   *
   * @throws JebException If a JEB error occurs.
   * @throws DatabaseException  If a DB error occurs.
   * @throws DirectoryException If a directory error occurs.
   */
  private void migrateExistingEntries()
      throws JebException, DatabaseException, DirectoryException {
    for(DNContext context : importMap.values()) {
      EntryContainer srcEntryContainer = context.getSrcEntryContainer();
      if(srcEntryContainer != null &&
          !context.getIncludeBranches().isEmpty()) {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode lockMode = LockMode.DEFAULT;
        OperationStatus status;
        Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
            "existing", String.valueOf(context.getBaseDN()));
        logError(message);
        Cursor cursor =
            srcEntryContainer.getDN2ID().openCursor(null,
                                                   CursorConfig.READ_COMMITTED);
        try {
          status = cursor.getFirst(key, data, lockMode);
          while(status == OperationStatus.SUCCESS &&
                !ldifImportConfig.isCancelled()) {
            if(threads.size() <= 0) {
              message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
              throw new JebException(message);
            }
            DN dn = DN.decode(ByteString.wrap(key.getData()));
            if(!context.getIncludeBranches().contains(dn)) {
              EntryID id = new EntryID(data);
              Entry entry =
                  srcEntryContainer.getID2Entry().get(null,
                      id, LockMode.DEFAULT);
              processEntry(context, entry);
              migratedCount++;
              status = cursor.getNext(key, data, lockMode);
            }  else {
              // This is the base entry for a branch that will be included
              // in the import so we don't want to copy the branch to the new
              // entry container.
              /**
               * Advance the cursor to next entry at the same level in the DIT
               * skipping all the entries in this branch.
               * Set the next starting value to a value of equal length but
               * slightly greater than the previous DN. Since keys are compared
               * in reverse order we must set the first byte (the comma).
               * No possibility of overflow here.
               */
              byte[] begin =
                  StaticUtils.getBytes("," + dn.toNormalizedString());
              begin[0] = (byte) (begin[0] + 1);
              key.setData(begin);
              status = cursor.getSearchKeyRange(key, data, lockMode);
      ArrayList<EntryID> IDs;
      if (parentDN != null && suffix.getParentDN() != null &&
              parentDN.equals(suffix.getParentDN())) {
        IDs = new ArrayList<EntryID>(suffix.getIDs());
        IDs.set(0, entryID);
      }
      else
      {
        EntryID nodeID;
        IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
        IDs.add(entryID);
        if (parentID != null)
        {
          IDs.add(parentID);
          EntryContainer ec = suffix.getEntryContainer();
          for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
               dn = ec.getParentWithinBase(dn)) {
            if((nodeID =  getAncestorID(dn2id, dn)) == null) {
              return false;
            } else {
              IDs.add(nodeID);
            }
          }
        } finally {
          cursor.close();
        }
      }
      suffix.setParentDN(parentDN);
      suffix.setIDs(IDs);
      entry.setAttachment(IDs);
      return true;
    }
  }
    private void processID2SC(EntryID entryID, Entry entry, Suffix suffix)
            throws DatabaseException
    {
      Set<byte[]> childKeySet = new HashSet<byte[]>();
      Set<byte[]> subtreeKeySet = new HashSet<byte[]>();
      Index id2children = suffix.getEntryContainer().getID2Children();
      Index id2subtree = suffix.getEntryContainer().getID2Subtree();
      id2children.indexer.indexEntry(entry, childKeySet);
      id2subtree.indexer.indexEntry(entry, subtreeKeySet);
      DatabaseEntry dbKey = new DatabaseEntry();
      DatabaseEntry dbVal = new DatabaseEntry();
      ImportIDSet idSet = new ImportIDSet();
      idSet.addEntryID(entryID, id2children.getIndexEntryLimit(),
              id2children.getMaintainCount());
      id2children.insert(idSet, childKeySet, dbKey, dbVal);
      DatabaseEntry dbSubKey = new DatabaseEntry();
      DatabaseEntry dbSubVal = new DatabaseEntry();
      ImportIDSet idSubSet = new ImportIDSet();
      idSubSet.addEntryID(entryID, id2subtree.getIndexEntryLimit(),
              id2subtree.getMaintainCount());
      id2subtree.insert(idSubSet, subtreeKeySet, dbSubKey, dbSubVal);
    }
    private EntryID getAncestorID(DN2ID dn2id, DN dn)
            throws DatabaseException
    {
      int i=0;
      EntryID nodeID = dn2id.get(null, dn, LockMode.DEFAULT);
      if(nodeID == null) {
        while((nodeID = dn2id.get(null, dn, LockMode.DEFAULT)) == null) {
          try {
            Thread.sleep(50);
            if(i == 3) {
              return null;
            }
            i++;
          } catch (Exception e) {
            return null;
          }
        }
      }
      return nodeID;
    }
  /**
   * Migrate excluded entries.
   *
   * @throws JebException If a JEB error occurs.
   * @throws DatabaseException  If a DB error occurs.
   * @throws DirectoryException If a directory error occurs.
   */
  private void migrateExcludedEntries()
      throws JebException, DatabaseException, DirectoryException {
    for(DNContext importContext : importMap.values()) {
      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
      if(srcEntryContainer != null &&
          !importContext.getExcludeBranches().isEmpty()) {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode lockMode = LockMode.DEFAULT;
        OperationStatus status;
        Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
            "excluded", String.valueOf(importContext.getBaseDN()));
        logError(message);
        Cursor cursor =
            srcEntryContainer.getDN2ID().openCursor(null,
                                                   CursorConfig.READ_COMMITTED);
        Comparator<byte[]> dn2idComparator =
            srcEntryContainer.getDN2ID().getComparator();
        try {
          for(DN excludedDN : importContext.getExcludeBranches()) {
            byte[] suffix =
                StaticUtils.getBytes(excludedDN.toNormalizedString());
            key.setData(suffix);
            status = cursor.getSearchKeyRange(key, data, lockMode);
            if(status == OperationStatus.SUCCESS &&
                Arrays.equals(key.getData(), suffix)) {
              // This is the base entry for a branch that was excluded in the
              // import so we must migrate all entries in this branch over to
              // the new entry container.
              byte[] end =
                  StaticUtils.getBytes("," + excludedDN.toNormalizedString());
              end[0] = (byte) (end[0] + 1);
              while(status == OperationStatus.SUCCESS &&
                  dn2idComparator.compare(key.getData(), end) < 0 &&
                  !ldifImportConfig.isCancelled()) {
                if(threads.size() <= 0) {
                  message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
                  throw new JebException(message);
                }
                EntryID id = new EntryID(data);
                Entry entry = srcEntryContainer.getID2Entry().get(null,
                    id, LockMode.DEFAULT);
                processEntry(importContext, entry);
                migratedCount++;
                status = cursor.getNext(key, data, lockMode);
    private void
    processIndexes(Suffix ctx, Entry entry, EntryID entryID) throws
            DatabaseException, DirectoryException, JebException, ConfigException
    {
      Transaction txn = null;
      Map<AttributeType, AttributeIndex> attrMap = ctx.getAttrIndexMap();
      for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
              attrMap.entrySet()) {
        AttributeType attrType = mapEntry.getKey();
        if(entry.hasAttribute(attrType)) {
          AttributeIndex attributeIndex = mapEntry.getValue();
          Index index;
          if((index=attributeIndex.getEqualityIndex()) != null) {
            indexAttr(ctx, index, entry, entryID);
          }
          if((index=attributeIndex.getPresenceIndex()) != null) {
            indexAttr(ctx, index, entry, entryID);
          }
          if((index=attributeIndex.getSubstringIndex()) != null) {
            indexAttr(ctx, index, entry, entryID);
          }
          if((index=attributeIndex.getOrderingIndex()) != null) {
            indexAttr(ctx, index, entry, entryID);
          }
          if((index=attributeIndex.getApproximateIndex()) != null) {
            indexAttr(ctx, index, entry, entryID);
          }
          for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) {
            vlvIdx.addEntry(txn, entryID, entry);
          }
          Map<String,Collection<Index>> extensibleMap =
                  attributeIndex.getExtensibleIndexes();
          if(!extensibleMap.isEmpty()) {
            Collection<Index> subIndexes =
                    attributeIndex.getExtensibleIndexes().get(
                            EXTENSIBLE_INDEXER_ID_SUBSTRING);
            if(subIndexes != null) {
              for(Index subIndex: subIndexes) {
                indexAttr(ctx, subIndex, entry, entryID);
              }
            }
            Collection<Index> sharedIndexes =
                    attributeIndex.getExtensibleIndexes().get(
                            EXTENSIBLE_INDEXER_ID_SHARED);
            if(sharedIndexes !=null) {
              for(Index sharedIndex:sharedIndexes) {
                indexAttr(ctx, sharedIndex, entry, entryID);
              }
            }
          }
        }
        finally
      }
    }
    private void indexAttr(Suffix ctx, Index index, Entry entry,
                           EntryID entryID)
            throws DatabaseException, ConfigException
    {
      insertKeySet.clear();
      index.indexer.indexEntry(entry, insertKeySet);
      for(byte[] key : insertKeySet)
      {
        processKey(ctx, index, key, entryID, indexComparator, null);
      }
    }
    private void flushIndexBuffers() throws InterruptedException,
                 ExecutionException
    {
      Iterator<Suffix> i  = dnSuffixMap.values().iterator();
      Suffix suffix = i.next();
      for(Map<DatabaseContainer, IndexBuffer> map : suffixMap.values())
      {
        for(Map.Entry<DatabaseContainer, IndexBuffer> e : map.entrySet())
        {
          cursor.close();
          DatabaseContainer container = e.getKey();
          IndexBuffer indexBuffer = e.getValue();
          if(container instanceof DN2ID)
          {
            indexBuffer.setComparator(dnComparator);
          }
          else
          {
            indexBuffer.setComparator(indexComparator);
          }
          indexBuffer.setContainer(container);
          indexBuffer.setEntryContainer(suffix.getEntryContainer());
          Future<Void> future = sortService.submit(new SortTask(indexBuffer));
          future.get();
        }
      }
    }
    private void
    processKey(Suffix ctx, DatabaseContainer container, byte[] key,
               EntryID entryID,IndexBuffer.ComparatorBuffer<byte[]> comparator,
               EntryContainer entryContainer) throws ConfigException
    {
      IndexBuffer indexBuffer;
      Map<DatabaseContainer, IndexBuffer> conMap = suffixMap.get(ctx);
      if(!conMap.containsKey(container))
      {
        indexBuffer = getNewIndexBuffer();
        conMap.put(container, indexBuffer);
      }
      else
      {
        indexBuffer = conMap.get(container);
      }
      if(!indexBuffer.isSpaceAvailable(key))
      {
        indexBuffer.setContainer(container);
        indexBuffer.setComparator(comparator);
        indexBuffer.setEntryContainer(entryContainer);
        sortService.submit(new SortTask(indexBuffer));
        indexBuffer = getNewIndexBuffer();
        conMap.remove(container);
        conMap.put(container, indexBuffer);
      }
      indexBuffer.add(key, entryID);
    }
    private IndexBuffer getNewIndexBuffer() throws ConfigException
    {
      IndexBuffer indexBuffer = freeBufQue.poll();
      if(indexBuffer.isPoison())
      {
        Message msg = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
                "Abort import - MPD");
        throw new ConfigException(msg);
      }
      return indexBuffer;
    }
    private void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
            throws ConfigException
    {
      DatabaseContainer dn2id = suffix.getDN2ID();
      byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
      processKey(suffix, dn2id, dnBytes, entryID, dnComparator,
              suffix.getEntryContainer());
    }
  }
  /**
   * The task reads the temporary index files and writes their results to the
   * index database.
   */
  private final class IndexWriteDBTask implements Callable<Void> {
    private final IndexManager indexMgr;
    private final boolean isDN2ID;
    private final DatabaseEntry dbKey, dbValue;
    private final DN2ID dn2id;
    private final Index index;
    private final EntryContainer entryContainer;
    private final int id2ChildLimit;
    private final boolean id2ChildMCount;
    private TreeMap<DN,EntryID> parentIDMap = new TreeMap<DN,EntryID>();
    private DN parentDN, lastDN;
    private EntryID parentID, lastID;
    private final Map<byte[], ImportIDSet> id2childTree;
    private final Map<byte[], ImportIDSet> id2subtreeTree;
    private final int cacheSize;
    private ByteBuffer directBuffer = null;
    public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID,
                            ByteBuffer b, int cacheSize)
    {
      this(indexMgr, isDN2ID, cacheSize);
      directBuffer = b;
    }
    public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID,
                            int cacheSize)
    {
      this.indexMgr = indexMgr;
      this.entryContainer = indexMgr.entryContainer;
      this.isDN2ID = isDN2ID;
      this.dbKey = new DatabaseEntry();
      this.dbValue = new DatabaseEntry();
      this.cacheSize = cacheSize;
      if(isDN2ID)
      {
        this.dn2id = indexMgr.dn2id;
        this.index = null;
        id2ChildLimit = entryContainer.getID2Children().getIndexEntryLimit();
        id2ChildMCount = entryContainer.getID2Subtree().getMaintainCount();
        Comparator<byte[]> id2ChildComparator =
                entryContainer.getID2Children().getComparator();
        Comparator<byte[]> id2SubtreeComparator =
                entryContainer.getID2Subtree().getComparator();
        id2childTree =
                new TreeMap<byte[], ImportIDSet>(id2ChildComparator);
        id2subtreeTree =
                new TreeMap<byte[], ImportIDSet>(id2SubtreeComparator);
      }
      else
      {
        this.dn2id = null;
        this.index = indexMgr.getIndex();
        id2subtreeTree = null;
        id2childTree = null;
        id2ChildLimit = 0;
        id2ChildMCount = false;
      }
    }
    public Void call() throws Exception
    {
      Comparator<byte[]> comparator = indexMgr.getComparator();
      int limit = indexMgr.getLimit();
      boolean maintainCount = indexMgr.getMaintainCount();
      byte[] cKey = null;
      ImportIDSet cIDSet = null;
      indexMgr.init();
      List<Buffer> bufferList = indexMgr.getBufferList();
      SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
      int p = 0;
      int offSet = cacheSize;
      for(Buffer b : bufferList)
      {
        if(directBuffer != null)
        {
          directBuffer.position(p);
          directBuffer.limit(offSet);
          ByteBuffer slice = directBuffer.slice();
          b.init(indexMgr, slice, cacheSize);
          p += cacheSize;
          offSet += cacheSize;
        }
        else
        {
          b.init(indexMgr, null, cacheSize);
        }
        bufferSet.add(b);
      }
      while(!bufferSet.isEmpty())
      {
        Buffer b;
        b = bufferSet.first();
        if(b == null) {
          System.out.println("null b");
        }
        bufferSet.remove(b);
        byte[] key = b.getKey();
        ImportIDSet idSet = b.getIDSet();
        if(cKey == null)
        {
          cKey = key;
          cIDSet = idSet;
        }
        else
        {
          if(comparator.compare(key, cKey) != 0)
          {
            addToDB(cKey, cIDSet);
            indexMgr.incrKeyCount();
            cKey = key;
            cIDSet = idSet;
          }
          else
          {
            cIDSet.setKey(cKey);
            cIDSet.merge(idSet, limit, maintainCount);
          }
        }
        if(b.hasMoreData())
        {
          b.getNextRecord();
          bufferSet.add(b);
        }
      }
      if(cKey != null)
      {
        addToDB(cKey, cIDSet);
      }
      cleanUP();
      return null;
    }
    private void cleanUP() throws DatabaseException, DirectoryException,
            IOException
    {
      if(!isDN2ID) {
        index.closeCursor();
        Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(index.getName());
        logError(msg);
      }
      else
      {
        if(dn2idPhase2)
        {
          flushSubTreeChildIndexes();
        }
      }
      indexMgr.setDone();
      indexMgr.close();
      indexMgr.deleteIndexFile();
    }
    private void flushSubTreeChildIndexes()
            throws DatabaseException, DirectoryException
    {
      Index  id2child = entryContainer.getID2Children();
      Set<Map.Entry<byte[], ImportIDSet>> id2childSet =
              id2childTree.entrySet();
      for(Map.Entry<byte[], ImportIDSet> e : id2childSet)
      {
        byte[] key = e.getKey();
        ImportIDSet idSet = e.getValue();
        dbKey.setData(key);
        id2child.insert(dbKey, idSet, dbValue);
      }
      id2child.closeCursor();
      Index id2subtree = entryContainer.getID2Subtree();
      Set<Map.Entry<byte[], ImportIDSet>> subtreeSet =
              id2subtreeTree.entrySet();
      for(Map.Entry<byte[], ImportIDSet> e : subtreeSet)
      {
        byte[] key = e.getKey();
        ImportIDSet idSet = e.getValue();
        dbKey.setData(key);
        id2subtree.insert(dbKey, idSet, dbValue);
      }
      id2subtree.closeCursor();
      Message msg =
             NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getTotDNCount());
      logError(msg);
    }
    private void addToDB(byte[] key, ImportIDSet record)
            throws InterruptedException, DatabaseException, DirectoryException
    {
      record.setKey(key);
      if(!this.isDN2ID)
      {
        addIndex(record);
      }
      else
      {
        if(dn2idPhase2)
        {
          addDN2ID(record);
        }
      }
    }
    private void id2Subtree(EntryContainer ec, EntryID childID,
                            int limit, boolean mCount) throws DatabaseException
    {
      ImportIDSet idSet;
      if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
      {
        idSet = new ImportIDSet();
        id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
      }
      else
      {
        idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
      }
      idSet.addEntryID(childID, limit, mCount);
      for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
           dn = ec.getParentWithinBase(dn))
      {
        EntryID nodeID = parentIDMap.get(dn);
        if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
        {
          idSet = new ImportIDSet();
          id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
        }
        else
        {
          idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
        }
        idSet.addEntryID(childID, limit, mCount);
      }
    }
    private void id2child(EntryID childID, int limit, boolean mCount)
    {
      ImportIDSet idSet;
      if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
      {
        idSet = new ImportIDSet();
        id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
      }
      else
      {
        idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
      }
      idSet.addEntryID(childID, limit, mCount);
    }
    private boolean checkParent(DN dn, EntryID id, EntryContainer ec)
    {
      if(parentIDMap.isEmpty())
      {
        parentIDMap.put(dn, id);
        return true;
      }
      else if(lastDN != null && lastDN.isAncestorOf(dn))
      {
        parentIDMap.put(lastDN, lastID);
        parentDN = lastDN;
        parentID = lastID;
        lastDN = dn;
        lastID = id;
        return true;
      }
      else if(parentIDMap.lastKey().isAncestorOf(dn))
      {
        parentDN = parentIDMap.lastKey();
        parentID = parentIDMap.get(parentDN);
        lastDN = dn;
        lastID = id;
        return true;
      }
      else
      {
        DN pDN = ec.getParentWithinBase(dn);
        if(parentIDMap.containsKey(pDN)) {
          DN lastKey = parentIDMap.lastKey();
          Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey);
          for(Map.Entry<DN, EntryID> e : subMap.entrySet())
          {
            subMap.remove(e.getKey());
          }
          parentDN = pDN;
          parentID = parentIDMap.get(pDN);
          lastDN = dn;
          lastID = id;
        }
        else
        {
          Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString());
          Entry e = new Entry(dn, null, null, null);
          reader.rejectEntry(e, msg);
          return false;
        }
      }
      return true;
    }
    private void addDN2ID(ImportIDSet record)
            throws DatabaseException, DirectoryException
    {
      DatabaseEntry idVal = new DatabaseEntry();
      dbKey.setData(record.getKey());
      idVal.setData(record.toDatabase());
      DN dn = DN.decode(ByteString.wrap(dbKey.getData()));
      EntryID entryID = new EntryID(idVal);
      if(!checkParent(dn, entryID, entryContainer))
      {
        return;
      }
      dn2id.putRaw(null, dbKey, idVal);
      indexMgr.addTotDNCount(1);
      if(parentDN != null)
      {
        id2child(entryID, id2ChildLimit, id2ChildMCount);
        id2Subtree(entryContainer,
                entryID, id2ChildLimit, id2ChildMCount);
      }
    }
    private void addIndex(ImportIDSet record) throws DatabaseException
    {
      dbKey.setData(record.getKey());
      index.insert(dbKey, record, dbValue);
    }
  }
  /**
   * This task writes the temporary index files using the sorted buffers read
   * from a blocking queue.
   */
  private final class IndexFileWriterTask implements Runnable
  {
    private final IndexManager indexMgr;
    private final BlockingQueue<IndexBuffer> que;
    private final ByteArrayOutputStream byteStream =
            new ByteArrayOutputStream(2 * bufferSize);
    private final DataOutputStream dataStream;
    private long bufCount = 0;
    private final File file;
    private final SortedSet<IndexBuffer> indexSortedSet;
    private boolean poisonSeen = false;
    public IndexFileWriterTask(BlockingQueue<IndexBuffer> que,
                            IndexManager indexMgr) throws FileNotFoundException
    {
      this.que = que;
      file = indexMgr.getFile();
      this.indexMgr = indexMgr;
      BufferedOutputStream bufferedStream =
                   new BufferedOutputStream(new FileOutputStream(file), 2 * MB);
      dataStream = new DataOutputStream(bufferedStream);
      indexSortedSet = new TreeSet<IndexBuffer>();
    }
    public void run()
    {
      long offset = 0;
      List<IndexBuffer> l = new LinkedList<IndexBuffer>();
      try {
        while(true)
        {
          IndexBuffer indexBuffer = que.poll();
          if(indexBuffer != null)
          {
            long beginOffset = offset;
            long bufLen;
            if(!que.isEmpty())
            {
              que.drainTo(l, DRAIN_TO);
              l.add(indexBuffer);
              bufLen = writeIndexBuffers(l);
              for(IndexBuffer id : l)
              {
                id.reset();
              }
              freeBufQue.addAll(l);
              l.clear();
              if(poisonSeen)
              {
                break;
              }
            }
            else
            {
              if(indexBuffer.isPoison())
              {
                break;
              }
              bufLen = writeIndexBuffer(indexBuffer);
              indexBuffer.reset();
              freeBufQue.add(indexBuffer);
            }
            offset += bufLen;
            indexMgr.addBuffer(new Buffer(beginOffset, offset, bufCount));
            bufCount++;
            bufferCount.incrementAndGet();
          }
        }
        dataStream.close();
        indexMgr.setFileLength();
      }
      catch (IOException e) {
        Message msg =
                ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(),
                                    e.getMessage());
        logError(msg);
      }
    }
    private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException
    {
      int numKeys = indexBuffer.getNumberKeys();
      indexBuffer.setPos(-1);
      long bufLen = 0;
      byteStream.reset();
      for(int i = 0; i < numKeys; i++)
      {
        if(indexBuffer.getPos() == -1)
        {
          indexBuffer.setPos(i);
          byteStream.write(indexBuffer.getID(i));
          continue;
        }
        if(!indexBuffer.compare(i))
        {
          int recLen = indexBuffer.getKeySize();
          recLen += byteStream.size();
          recLen += 8;
          bufLen += recLen;
          indexBuffer.writeKey(dataStream);
          dataStream.writeInt(byteStream.size());
          byteStream.writeTo(dataStream);
          indexBuffer.setPos(i);
          byteStream.reset();
        }
        byteStream.write(indexBuffer.getID(i));
      }
      if(indexBuffer.getPos() != -1)
      {
        int recLen = indexBuffer.getKeySize();
        recLen += byteStream.size();
        recLen += 8;
        bufLen += recLen;
        indexBuffer.writeKey(dataStream);
        dataStream.writeInt(byteStream.size());
        byteStream.writeTo(dataStream);
      }
      return bufLen;
    }
    private long writeIndexBuffers(List<IndexBuffer> buffers)
            throws IOException
    {
      long id = 0;
      long bufLen = 0;
      byteStream.reset();
      for(IndexBuffer b : buffers)
      {
        if(b.isPoison())
        {
          poisonSeen = true;
        }
        else
        {
          b.setPos(0);
          b.setID(id++);
          indexSortedSet.add(b);
        }
      }
      byte[] saveKey = null;
      while(!indexSortedSet.isEmpty())
      {
        IndexBuffer b = indexSortedSet.first();
        indexSortedSet.remove(b);
        byte[] key = b.getKeyBytes(b.getPos());
        if(saveKey == null)
        {
          saveKey = key;
          byteStream.write(b.getID(b.getPos()));
        }
        else
        {
          if(!b.compare(saveKey))
          {
            int recLen = saveKey.length;
            recLen += byteStream.size();
            recLen += 8;
            bufLen += recLen;
            dataStream.writeInt(saveKey.length);
            dataStream.write(saveKey);
            dataStream.writeInt(byteStream.size());
            byteStream.writeTo(dataStream);
            byteStream.reset();
            saveKey = key;
            byteStream.write(b.getID(b.getPos()));
          }
          else
          {
            byteStream.write(b.getID(b.getPos()));
          }
        }
        if(b.hasMoreData())
        {
          b.getNextRecord();
          indexSortedSet.add(b);
        }
      }
      if(saveKey != null)
      {
        int recLen = saveKey.length;
        recLen += byteStream.size();
        recLen += 8;
        bufLen += recLen;
        dataStream.writeInt(saveKey.length);
        dataStream.write(saveKey);
        dataStream.writeInt(byteStream.size());
        byteStream.writeTo(dataStream);
      }
      return bufLen;
    }
  }
  /**
   * This task main function is to sort the index buffers given to it from
   * the import tasks reading the LDIF file. It will also create a index
   * file writer task and corresponding queue if needed. The sorted index
   * buffers are put on the index file writer queues for writing to a temporary
   * file.
   */
  private final class SortTask implements Callable<Void>
  {
    private final IndexBuffer indexBuffer;
    public SortTask(IndexBuffer indexBuffer)
    {
      this.indexBuffer = indexBuffer;
    }
    /**
     * {@inheritDoc}
     */
    public Void call() throws Exception
    {
      if (config.isCancelled())
      {
        return null;
      }
      indexBuffer.sort();
      if(containerQueMap.containsKey(indexBuffer.getContainer())) {
        BlockingQueue<IndexBuffer> q =
                containerQueMap.get(indexBuffer.getContainer());
        q.add(indexBuffer);
      }
      else
      {
        DatabaseContainer container = indexBuffer.getContainer();
        EntryContainer entryContainer = indexBuffer.getEntryContainer();
        createIndexWriterTask(container, entryContainer);
        BlockingQueue<IndexBuffer> q = containerQueMap.get(container);
        q.add(indexBuffer);
      }
      return null;
    }
    private void createIndexWriterTask(DatabaseContainer container,
                                       EntryContainer entryContainer)
    throws FileNotFoundException
    {
      synchronized(container) {
        if(containerQueMap.containsKey(container))
        {
          return;
        }
        IndexManager indexMgr;
        if(container instanceof Index)
        {
          Index index = (Index) container;
          indexMgr = new IndexManager(index);
        }
        else
        {
          DN2ID dn2id = (DN2ID) container;
          indexMgr = new IndexManager(dn2id, entryContainer);
        }
        containerIndexMgrMap.put(container, indexMgr);
        BlockingQueue<IndexBuffer> newQue =
                new ArrayBlockingQueue<IndexBuffer>(threadCount + 5);
        IndexFileWriterTask indexWriter =
                new IndexFileWriterTask(newQue, indexMgr);
        indexWriterList.add(indexWriter);
        indexWriterFutures.add(indexProcessService.submit(indexWriter));
        containerQueMap.put(container, newQue);
      }
    }
  }
  /**
   * The buffer class is used to process a buffer from the temporary index files
   * during phase 2 processing.
   */
  private final class Buffer implements Comparable<Buffer>
  {
    private IndexManager indexMgr;
    private final long begin, end, id;
    private long offset;
    private ByteBuffer cache;
    private int keyLen, idLen;
    private byte[] key;
    private ImportIDSet idSet;
    public Buffer(long begin, long end, long id)
    {
      this.begin = begin;
      this.end = end;
      this.offset = 0;
      this.id = id;
    }
    private void init(IndexManager indexMgr, ByteBuffer b,
                      long cacheSize) throws IOException
    {
      this.indexMgr = indexMgr;
      if(b == null)
      {
        cache = ByteBuffer.allocate((int)cacheSize);
      }
      else
      {
        cache = b;
      }
      loadCache();
      cache.flip();
      getNextRecord();
    }
    private void loadCache() throws IOException
    {
      FileChannel fileChannel = indexMgr.getChannel();
      fileChannel.position(begin + offset);
      long leftToRead =  end - (begin + offset);
      long bytesToRead;
      if(leftToRead < cache.remaining())
      {
        int pos = cache.position();
        cache.limit((int) (pos + leftToRead));
        bytesToRead = (int)leftToRead;
      }
      else
      {
        bytesToRead = Math.min((end - offset),cache.remaining());
      }
      int bytesRead = 0;
      while(bytesRead < bytesToRead)
      {
        bytesRead += fileChannel.read(cache);
      }
      offset += bytesRead;
      indexMgr.addBytesRead(bytesRead);
    }
    public boolean hasMoreData() throws IOException
    {
      boolean ret = ((begin + offset) >= end) ? true: false;
      if(cache.remaining() == 0 && ret)
      {
        return false;
      }
      else
      {
        return true;
      }
    }
    public byte[] getKey()
    {
      return key;
    }
    public ImportIDSet getIDSet()
    {
      return idSet;
    }
    public long getBufID()
    {
      return id;
    }
    public void getNextRecord()  throws IOException
    {
      getNextKey();
      getNextIDSet();
    }
    private int getInt()  throws IOException
    {
      ensureData(4);
      return cache.getInt();
    }
    private long getLong()  throws IOException
    {
      ensureData(8);
      return cache.getLong();
    }
    private void getBytes(byte[] b) throws IOException
    {
      ensureData(b.length);
      cache.get(b);
    }
    private void getNextKey() throws IOException, BufferUnderflowException
    {
      keyLen = getInt();
      key = new byte[keyLen];
        getBytes(key);
    }
    private void getNextIDSet() throws IOException, BufferUnderflowException
    {
      idLen = getInt();
      int idCount = idLen/8;
      idSet = new ImportIDSet(idCount);
      for(int i = 0; i < idCount; i++)
      {
        long l = getLong();
        idSet.addEntryID(l, indexMgr.getLimit(), indexMgr.getMaintainCount());
      }
    }
    private void ensureData(int len) throws IOException
    {
      if(cache.remaining() == 0)
      {
        cache.clear();
        loadCache();
        cache.flip();
      }
      else if(cache.remaining() < len)
      {
        cache.compact();
        loadCache();
        cache.flip();
      }
    }
    public int compareTo(Buffer o) {
      if(key == null) {
        if(id == o.getBufID())
        {
          return 0;
        }
        else
        {
          return id > o.getBufID() ? 1 : -1;
        }
      }
      if(this.equals(o))
      {
        return 0;
      }
      int rc = indexMgr.getComparator().compare(key, o.getKey());
      if(rc == 0)
      {
        if(idSet.isDefined())
        {
          return -1;
        }
        else if(o.getIDSet().isDefined())
        {
          return 1;
        }
        else if(idSet.size() == o.getIDSet().size())
        {
          rc = id > o.getBufID() ? 1 : -1;
        }
        else
        {
          rc = idSet.size() - o.getIDSet().size();
        }
      }
      return rc;
    }
  }
  /**
   * The index manager class is used to carry information about index processing
   * from phase 1 to phase 2.
   */
  private final class IndexManager
  {
    private final Index index;
    private final DN2ID dn2id;
    private final EntryContainer entryContainer;
    private final File file;
    private RandomAccessFile raf = null;
    private final List<Buffer> bufferList = new LinkedList<Buffer>();
    private final int limit;
    private long fileLength, bytesRead = 0;
    private final boolean maintainCount;
    private final Comparator<byte[]> comparator;
    private boolean done = false;
    private long totalDNS;
    private AtomicInteger keyCount = new AtomicInteger(0);
    private final String name;
    public IndexManager(Index index)
    {
      this.index = index;
      dn2id = null;
      file = new File(tempDir, index.getName());
      name = index.getName();
      limit = index.getIndexEntryLimit();
      maintainCount = index.getMaintainCount();
      comparator = index.getComparator();
      entryContainer = null;
    }
    public IndexManager(DN2ID dn2id, EntryContainer entryContainer)
    {
      index = null;
      this.dn2id = dn2id;
      file = new File(tempDir, dn2id.getName());
      limit = 1;
      maintainCount = false;
      comparator = dn2id.getComparator();
      this.entryContainer = entryContainer;
      name = dn2id.getName();
    }
    public void init() throws FileNotFoundException
    {
      raf = new RandomAccessFile(file, "r");
    }
    public FileChannel getChannel()
    {
      return raf.getChannel();
    }
    public void addBuffer(Buffer o)
    {
      this.bufferList.add(o);
    }
    public List<Buffer> getBufferList()
    {
      return bufferList;
    }
    public File getFile()
    {
      return file;
    }
    public void deleteIndexFile()
    {
       file.delete();
    }
    public void close() throws IOException
    {
        raf.close();
    }
    public int getLimit()
    {
      return limit;
    }
    public boolean getMaintainCount()
    {
      return maintainCount;
    }
    public Comparator<byte[]> getComparator()
    {
      return comparator;
    }
    public Index getIndex()
    {
      return index;
    }
    public void setFileLength()
    {
      this.fileLength = file.length();
    }
    public void addBytesRead(int bytesRead)
    {
      this.bytesRead += bytesRead;
    }
    public void setDone()
    {
      this.done = true;
    }
    public void addTotDNCount(int delta)
    {
      this.totalDNS += delta;
    }
    public long getTotDNCount()
    {
      return totalDNS;
    }
    public void printStats(long deltaTime)
    {
      if(!done)
      {
        float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
        Message msg = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(name,
                       (fileLength - bytesRead), rate);
        logError(msg);
      }
    }
    public void incrKeyCount()
    {
      keyCount.incrementAndGet();
    }
  }
  /**
   * This class reports progress of the import job at fixed intervals.
   */
  private final class ProgressTask extends TimerTask
  private final class FirstPhaseProgressTask extends TimerTask
  {
    /**
     * The number of entries that had been read at the time of the
@@ -993,89 +1795,72 @@
    private EnvironmentStats prevEnvStats;
    /**
     * The number of bytes in a megabyte.
     * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
     * The number of bytes in a megabyte. Note that 1024*1024 bytes may
     * eventually become known as a mebibyte(MiB).
     */
    public static final int bytesPerMegabyte = 1024*1024;
    public static final int bytesPerMegabyte = 1024 * 1024;
    //Determines if the ldif is being read.
    // Determines if the ldif is being read.
    private boolean ldifRead = false;
    //Determines if eviction has been detected.
    // Determines if eviction has been detected.
    private boolean evicting = false;
    //Entry count when eviction was detected.
    // Entry count when eviction was detected.
    private long evictionEntryCount = 0;
    //Suspend output.
    // Suspend output.
    private boolean pause = false;
    /**
     * Create a new import progress task.
     * @throws DatabaseException If an error occurs in the JE database.
     */
    public ProgressTask() throws DatabaseException
    public FirstPhaseProgressTask()
    {
      previousTime = System.currentTimeMillis();
      prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
      try
      {
        prevEnvStats =
                rootContainer.getEnvironmentStats(new StatsConfig());
      }
      catch (DatabaseException e)
      {
        throw new RuntimeException(e);
      }
    }
    /**
     * Return if reading the LDIF file.
     */
    public void ldifRead() {
      ldifRead=true;
    }
   /**
    * Return value of evicting flag.
    *
    * @return <CODE>True</CODE> if eviction is detected.
    */
    public  boolean isEvicting() {
     return evicting;
    }
    /**
     * Return count of entries when eviction was detected.
     *
     * @return  The entry count when eviction was detected.
     */
    public long getEvictionEntryCount() {
      return evictionEntryCount;
    }
    /**
     * Suspend output if true.
     *
     * @param v The value to set the suspend value to.
     */
    public void setPause(boolean v) {
    pause=v;
   }
    /**
     * The action to be performed by this timer task.
     */
    public void run() {
    @Override
    public void run()
    {
      long latestCount = reader.getEntriesRead() + 0;
      long deltaCount = (latestCount - previousCount);
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
      Message message;
      if (deltaTime == 0) {
      if (deltaTime == 0)
      {
        return;
      }
      if(pause) {
      if (pause)
      {
        return;
      }
      if(!ldifRead) {
        long numRead     = reader.getEntriesRead();
        long numIgnored  = reader.getEntriesIgnored();
      if (!ldifRead)
      {
        long numRead = reader.getEntriesRead();
        long numIgnored = reader.getEntriesIgnored();
        long numRejected = reader.getEntriesRejected();
         float rate = 1000f*deltaCount / deltaTime;
         message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(
            numRead, numIgnored, numRejected, 0, rate);
        float rate = 1000f * deltaCount / deltaTime;
        message =
                NOTE_JEB_IMPORT_PROGRESS_REPORT.get(numRead, numIgnored,
                        numRejected, 0, rate);
        logError(message);
      }
      try
@@ -1083,16 +1868,18 @@
        Runtime runtime = Runtime.getRuntime();
        long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
        EnvironmentStats envStats =
            rootContainer.getEnvironmentStats(new StatsConfig());
                rootContainer.getEnvironmentStats(new StatsConfig());
        long nCacheMiss =
            envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
                envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
        float cacheMissRate = 0;
        if (deltaCount > 0) {
          cacheMissRate = nCacheMiss/(float)deltaCount;
        if (deltaCount > 0)
        {
          cacheMissRate = nCacheMiss / (float) deltaCount;
        }
        message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
            freeMemory, cacheMissRate);
        message =
                NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
                        cacheMissRate);
        logError(message);
        long evictPasses = envStats.getNEvictPasses();
        long evictNodes = envStats.getNNodesExplicitlyEvicted();
@@ -1102,37 +1889,196 @@
        long cleanerEntriesRead = envStats.getNCleanerEntriesRead();
        long cleanerINCleaned = envStats.getNINsCleaned();
        long checkPoints = envStats.getNCheckpoints();
        if(evictPasses != 0) {
          if(!evicting) {
            evicting=true;
            if(!ldifRead) {
              evictionEntryCount=reader.getEntriesRead();
        if (evictPasses != 0)
        {
          if (!evicting)
          {
            evicting = true;
            if (!ldifRead)
            {
              evictionEntryCount = reader.getEntriesRead();
              message =
                 NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
                      NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED
                              .get(evictionEntryCount);
              logError(message);
            }
          }
          message =
                  NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
                          evictNodes, evictBinsStrip);
                  NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
                          evictPasses, evictNodes, evictBinsStrip);
          logError(message);
        }
        if(cleanerRuns != 0) {
          message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
                  cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
        if (cleanerRuns != 0)
        {
          message =
                  NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
                          cleanerDeletions, cleanerEntriesRead,
                          cleanerINCleaned);
          logError(message);
        }
        if(checkPoints  > 1) {
          message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
        if (checkPoints > 1)
        {
          message =
                  NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
          logError(message);
        }
        prevEnvStats = envStats;
      } catch (DatabaseException e) {
      }
      catch (DatabaseException e)
      {
        // Unlikely to happen and not critical.
      }
      previousCount = latestCount;
      previousTime = latestTime;
    }
  }
}
  /**
   * This class reports progress of the import job at fixed intervals.
   */
  private final class SecondPhaseProgressTask extends TimerTask
  {
    /**
     * The number of entries that had been read at the time of the
     * previous progress report.
     */
    private long previousCount = 0;
    /**
     * The time in milliseconds of the previous progress report.
     */
    private long previousTime;
    /**
     * The environment statistics at the time of the previous report.
     */
    private EnvironmentStats prevEnvStats;
    /**
     * The number of bytes in a megabyte. Note that 1024*1024 bytes may
     * eventually become known as a mebibyte(MiB).
     */
    public static final int bytesPerMegabyte = 1024 * 1024;
    // Determines if eviction has been detected.
    private boolean evicting = false;
    // Suspend output.
    private boolean pause = false;
    private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap;
    /**
     * Create a new import progress task.
     * @param containerIndexMgrMap Map of database container objects to
     *                             index manager objects.
     */
    public SecondPhaseProgressTask(Map<DatabaseContainer,
            IndexManager> containerIndexMgrMap)
    {
      previousTime = System.currentTimeMillis();
      this.containerIndexMgrMap = containerIndexMgrMap;
      try
      {
        prevEnvStats =
                rootContainer.getEnvironmentStats(new StatsConfig());
      }
      catch (DatabaseException e)
      {
        throw new RuntimeException(e);
      }
    }
    /**
     * The action to be performed by this timer task.
     */
    @Override
    public void run()
    {
      long latestCount = reader.getEntriesRead() + 0;
      long deltaCount = (latestCount - previousCount);
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
      Message message;
      if (deltaTime == 0)
      {
        return;
      }
      if (pause)
      {
        return;
      }
      try
      {
        Runtime runtime = Runtime.getRuntime();
        long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
        EnvironmentStats envStats =
                rootContainer.getEnvironmentStats(new StatsConfig());
        long nCacheMiss =
                envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
        float cacheMissRate = 0;
        if (deltaCount > 0)
        {
          cacheMissRate = nCacheMiss / (float) deltaCount;
        }
        message =
                NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
                        cacheMissRate);
        logError(message);
        long evictPasses = envStats.getNEvictPasses();
        long evictNodes = envStats.getNNodesExplicitlyEvicted();
        long evictBinsStrip = envStats.getNBINsStripped();
        long cleanerRuns = envStats.getNCleanerRuns();
        long cleanerDeletions = envStats.getNCleanerDeletions();
        long cleanerEntriesRead = envStats.getNCleanerEntriesRead();
        long cleanerINCleaned = envStats.getNINsCleaned();
        long checkPoints = envStats.getNCheckpoints();
        if (evictPasses != 0)
        {
          if (!evicting)
          {
            evicting = true;
          }
          message =
                  NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
                          evictPasses, evictNodes, evictBinsStrip);
          logError(message);
        }
        if (cleanerRuns != 0)
        {
          message =
                  NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
                          cleanerDeletions, cleanerEntriesRead,
                          cleanerINCleaned);
          logError(message);
        }
        if (checkPoints > 1)
        {
          message =
                  NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
          logError(message);
        }
        prevEnvStats = envStats;
      }
      catch (DatabaseException e)
      {
        // Unlikely to happen and not critical.
      }
      previousCount = latestCount;
      previousTime = latestTime;
      for(Map.Entry<DatabaseContainer, IndexManager> e :
              containerIndexMgrMap.entrySet())
      {
        IndexManager indexMgr = e.getValue();
        indexMgr.printStats(deltaTime);
      }
    }
  }
}