mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
05.08.2010 06a2720e00f89a73a5617aabc4ee91cbac62fbee
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -28,27 +28,35 @@
package org.opends.server.backends.jeb.importLDIF;
import static org.opends.messages.JebMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.util.DynamicConstants.BUILD_ID;
import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import java.io.*;
import java.nio.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import org.opends.messages.Message;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn;
import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType;
import org.opends.server.admin.std.server.LocalDBBackendCfg;
import org.opends.server.admin.std.server.LocalDBIndexCfg;
import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn;
import org.opends.server.backends.jeb.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.*;
import org.opends.server.util.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.Platform;
import org.opends.server.util.StaticUtils;
import com.sleepycat.je.*;
import com.sleepycat.util.PackedInteger;
@@ -57,10 +65,10 @@
 * This class provides the engine that performs both importing of LDIF files and
 * the rebuilding of indexes.
 */
public class Importer
public final class Importer
{
  private static final int TIMER_INTERVAL = 10000;
  final static int KB = 1024;
  private static final int KB = 1024;
  private static final int MB =  (KB * KB);
  private static final String DEFAULT_TMP_DIR = "import-tmp";
  private static final String TMPENV_DIR = "tmp-env";
@@ -72,31 +80,25 @@
  //Defaults for LDIF reader buffers, min memory required to import and default
  //size for byte buffers.
  private static final int READER_WRITER_BUFFER_SIZE = 1 * MB;
  private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
  private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE +
                                                 MAX_DB_LOG_SIZE;
  private static final int BYTE_BUFFER_CAPACITY = 128;
  //Min and MAX sizes of phase one buffer.
  private static final int MAX_BUFFER_SIZE = 100 * MB;
  private static final int MIN_BUFFER_SIZE = 8 * KB;
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  //Min size of phase two read-ahead cache.
  private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
  //Set aside this much for the JVM from free memory.
  private static final int JVM_MEM_PCT = 45;
  //Percent of import memory to use for temporary environment if the
  //skip DN validation flag isn't specified.
  private static final int TMPENV_MEM_PCT = 50;
  //Small heap threshold used to give more memory to JVM to attempt OOM errors.
  private static final int SMALL_HEAP_SIZE = 256 * MB;
  //The DN attribute type.
  private static AttributeType dnType;
  private static final IndexBuffer.IndexComparator indexComparator =
          new IndexBuffer.IndexComparator();
  static final IndexOutputBuffer.IndexComparator indexComparator =
          new IndexOutputBuffer.IndexComparator();
  //Phase one buffer and imported entries counts.
  private final AtomicInteger bufferCount = new AtomicInteger(0);
@@ -124,15 +126,20 @@
  //Import configuration.
  private final LDIFImportConfig importConfiguration;
  //Backend configuration.
  private final LocalDBBackendCfg backendConfiguration;
  //LDIF reader.
  private LDIFReader reader;
  //Migrated entry count.
  private int migratedCount;
  //Size in bytes of temporary env, DB cache, DB log buf size.
  private long tmpEnvCacheSize = 0, dbCacheSize = MAX_DB_CACHE_SIZE,
               dbLogBufSize = MAX_DB_LOG_SIZE;
  // Size in bytes of temporary env.
  private long tmpEnvCacheSize;
  // Size in bytes of DB cache.
  private long dbCacheSize;
  //The executor service used for the buffer sort tasks.
  private ExecutorService bufferSortService;
@@ -141,14 +148,14 @@
  private ExecutorService scratchFileWriterService;
  //Queue of free index buffers -- used to re-cycle index buffers;
  private final BlockingQueue<IndexBuffer> freeBufferQueue =
          new LinkedBlockingQueue<IndexBuffer>();
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
          new LinkedBlockingQueue<IndexOutputBuffer>();
  //Map of index keys to index buffers.  Used to allocate sorted
  //index buffers to a index writer thread.
  private final
  Map<IndexKey, BlockingQueue<IndexBuffer>> indexKeyQueMap =
          new ConcurrentHashMap<IndexKey, BlockingQueue<IndexBuffer>>();
  Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
          new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
  //Map of DB containers to index managers. Used to start phase 2.
  private final List<IndexManager> indexMgrList =
@@ -202,188 +209,158 @@
    }
  }
  //Rebuild-index instance.
  private
  Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
            EnvironmentConfig envConfig) throws
          InitializationException, JebException, ConfigException
  /**
   * Create a new import job with the specified rebuild index config.
   *
   * @param rebuildConfig
   *          The rebuild index configuration.
   * @param cfg
   *          The local DB back-end configuration.
   * @param envConfig
   *          The JEB environment config.
   * @throws InitializationException
   *           If a problem occurs during initialization.
   * @throws JebException
   *           If an error occurred when opening the DB.
   * @throws ConfigException
   *           If a problem occurs during initialization.
   */
  public Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
      EnvironmentConfig envConfig) throws  InitializationException,
      JebException, ConfigException
  {
    importConfiguration = null;
    tmpEnv = null;
    threadCount = 1;
    rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
    indexCount = rebuildManager.getIndexCount();
    scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount);
    scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
    this.importConfiguration = null;
    this.backendConfiguration = cfg;
    this.tmpEnv = null;
    this.threadCount = 1;
    this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
    this.indexCount = rebuildManager.getIndexCount();
    this.scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(
        indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
    File parentDir;
    if(rebuildConfig.getTmpDirectory() == null)
    if (rebuildConfig.getTmpDirectory() == null)
    {
      parentDir = getFileForPath(DEFAULT_TMP_DIR);
    }
    else
    {
       parentDir = getFileForPath(rebuildConfig.getTmpDirectory());
      parentDir = getFileForPath(rebuildConfig.getTmpDirectory());
    }
    tempDir = new File(parentDir, cfg.getBackendId());
    this.tempDir = new File(parentDir, cfg.getBackendId());
    recursiveDelete(tempDir);
    if(!tempDir.exists() && !tempDir.mkdirs())
    if (!tempDir.exists() && !tempDir.mkdirs())
    {
      Message message =
                ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir));
      Message message = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String
          .valueOf(tempDir));
      throw new InitializationException(message);
    }
    skipDNValidation = true;
    if(envConfig != null)
    {
      initializeDBEnv(envConfig);
    }
    this.skipDNValidation = true;
    initializeDBEnv(envConfig);
  }
  /**
   * Create a new import job with the specified ldif import config.
   *
   * @param importConfiguration The LDIF import configuration.
   * @param localDBBackendCfg The local DB back-end configuration.
   * @param envConfig The JEB environment config.
   * @throws  InitializationException If a problem occurs during initialization.
   * @param importConfiguration
   *          The LDIF import configuration.
   * @param localDBBackendCfg
   *          The local DB back-end configuration.
   * @param envConfig
   *          The JEB environment config.
   * @throws InitializationException
   *           If a problem occurs during initialization.
   * @throws ConfigException
   *           If a problem occurs reading the configuration.
   * @throws DatabaseException
   *           If an error occurred when opening the DB.
   */
  private Importer(LDIFImportConfig importConfiguration,
                   LocalDBBackendCfg localDBBackendCfg,
                   EnvironmentConfig envConfig) throws
          InitializationException, DatabaseException
  public Importer(LDIFImportConfig importConfiguration,
      LocalDBBackendCfg localDBBackendCfg, EnvironmentConfig envConfig)
      throws InitializationException, ConfigException, DatabaseException
  {
    rebuildManager = null;
    this.rebuildManager = null;
    this.importConfiguration = importConfiguration;
    if(importConfiguration.getThreadCount() == 0)
    this.backendConfiguration = localDBBackendCfg;
    if (importConfiguration.getThreadCount() == 0)
    {
      threadCount = Runtime.getRuntime().availableProcessors() * 2;
      this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
    }
    else
    {
      threadCount = importConfiguration.getThreadCount();
      this.threadCount = importConfiguration.getThreadCount();
    }
    indexCount = localDBBackendCfg.listLocalDBIndexes().length + 2;
    if(!importConfiguration.appendToExistingData()) {
      if(importConfiguration.clearBackend() ||
              localDBBackendCfg.getBaseDN().size() <= 1) {
        clearedBackend = true;
    // Determine the number of indexes.
    int indexes = 2; // dn2id + dn2uri
    for (String indexName : localDBBackendCfg.listLocalDBIndexes())
    {
      LocalDBIndexCfg index = localDBBackendCfg.getLocalDBIndex(indexName);
      SortedSet<IndexType> types = index.getIndexType();
      if (types.contains(IndexType.EXTENSIBLE))
      {
        indexes += types.size() - 1
            + index.getIndexExtensibleMatchingRule().size();
      }
      else
      {
        indexes += types.size();
      }
    }
    scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount);
    scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
    this.indexCount = indexes;
    if (!importConfiguration.appendToExistingData())
    {
      if (importConfiguration.clearBackend()
          || localDBBackendCfg.getBaseDN().size() <= 1)
      {
        this.clearedBackend = true;
      }
    }
    this.scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(
        indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
    File parentDir;
    if(importConfiguration.getTmpDirectory() == null)
    if (importConfiguration.getTmpDirectory() == null)
    {
      parentDir = getFileForPath(DEFAULT_TMP_DIR);
    }
    else
    {
       parentDir = getFileForPath(importConfiguration.getTmpDirectory());
      parentDir = getFileForPath(importConfiguration.getTmpDirectory());
    }
    tempDir = new File(parentDir, localDBBackendCfg.getBackendId());
    this.tempDir = new File(parentDir, localDBBackendCfg.getBackendId());
    recursiveDelete(tempDir);
    if(!tempDir.exists() && !tempDir.mkdirs())
    if (!tempDir.exists() && !tempDir.mkdirs())
    {
      Message message =
                ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir));
      Message message = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String
          .valueOf(tempDir));
      throw new InitializationException(message);
    }
    skipDNValidation = importConfiguration.getSkipDNValidation();
    initializeDBEnv(envConfig);
    //Set up temporary environment.
    if(!skipDNValidation)
    // Set up temporary environment.
    if (!skipDNValidation)
    {
      File envPath = new File(tempDir, TMPENV_DIR);
      envPath.mkdirs();
      tmpEnv = new TmpEnv(envPath);
      this.tmpEnv = new TmpEnv(envPath);
    }
    else
    {
      tmpEnv = null;
      this.tmpEnv = null;
    }
  }
  /**
   * Return and import LDIF instance using the specified arguments.
   *
   * @param importCfg The import config to use.
   * @param localDBBackendCfg The local DB backend config to use.
   * @param envCfg The JEB environment config to use.
   * @return A import LDIF instance.
   *
   * @throws InitializationException If the instance cannot be initialized.
   */
  public static
  Importer getInstance(LDIFImportConfig importCfg,
                       LocalDBBackendCfg localDBBackendCfg,
                       EnvironmentConfig envCfg)
          throws InitializationException
  {
     return  new Importer(importCfg, localDBBackendCfg, envCfg);
  }
  /**
   * Return an import rebuild index instance using the specified arguments.
   *
   * @param rebuildCfg The rebuild config to use.
   * @param localDBBackendCfg The local DB backend config to use.
   * @param envCfg The JEB environment config to use.
   * @return An import rebuild index instance.
   *
   * @throws InitializationException If the instance cannot be initialized.
   * @throws JebException If a JEB exception occurs.
   * @throws ConfigException If the instance cannot be configured.
   */
  public static synchronized
  Importer getInstance(RebuildConfig rebuildCfg,
                       LocalDBBackendCfg localDBBackendCfg,
                       EnvironmentConfig envCfg)
  throws InitializationException, JebException, ConfigException
  {
      return new Importer(rebuildCfg, localDBBackendCfg, envCfg);
  }
  private void adjustBufferSize(long availMem)
  {
     int oldThreadCount = threadCount;
     for(;threadCount > 0; threadCount--)
     {
       phaseOneBufferCount = 2 * (indexCount * threadCount);
       bufferSize = (int) (availMem/ ((4 * indexCount) + phaseOneBufferCount));
       if(bufferSize >= MIN_BUFFER_SIZE)
       {
         break;
       }
     }
     Message message =
           NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount, threadCount);
     logError(message);
  }
  private boolean getBufferSizes(long availMem)
  {
    boolean maxBuf = false;
    bufferSize = (int) (availMem / ((4 * indexCount) + phaseOneBufferCount));
    if(bufferSize >= MIN_BUFFER_SIZE)
    {
      if(bufferSize > MAX_BUFFER_SIZE)
      {
        bufferSize = MAX_BUFFER_SIZE;
        maxBuf = true;
      }
    }
    else if(bufferSize < MIN_BUFFER_SIZE)
    {
      adjustBufferSize(availMem);
    }
    return maxBuf;
  }
  /**
   * Return the suffix instance in the specified map that matches the specified
@@ -410,155 +387,204 @@
  }
  private long getTmpEnvironmentMemory(long availableMemoryImport)
  {
    int tmpMemPct = TMPENV_MEM_PCT;
    tmpEnvCacheSize = (availableMemoryImport * tmpMemPct) / 100;
    availableMemoryImport -= tmpEnvCacheSize;
    if(!clearedBackend)
    {
      long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
      tmpEnvCacheSize -= additionalDBCache;
      dbCacheSize += additionalDBCache;
    }
    return availableMemoryImport;
  }
  //Used for large heap sizes when the buffer max size has been identified. Any
  //extra memory can be given to the temporary environment in that case.
  private void adjustTmpEnvironmentMemory(long availableMemoryImport)
  {
    long additionalMem = availableMemoryImport -
            (phaseOneBufferCount * MAX_BUFFER_SIZE);
    if(additionalMem > 0)
    {
      tmpEnvCacheSize += additionalMem;
      if(!clearedBackend)
      {
        //The DN cache probably needs to be smaller and the DB cache bigger
        //because the dn2id is checked if the backend has not been cleared.
        long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
        tmpEnvCacheSize -= additionalDBCache;
        dbCacheSize += additionalDBCache;
      }
    }
  }
  private long defaultMemoryCalc(long availMem)
          throws InitializationException
  {
    long bufMem;
    if(availMem < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
    {
      long minCacheSize = MIN_DB_CACHE_SIZE;
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) {
        minCacheSize = 500 *KB;
      }
      dbCacheSize = minCacheSize;
      tmpEnvCacheSize = minCacheSize;
      dbLogBufSize = 0;
      bufMem = availMem - 2 * minCacheSize;
      if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) {
        Message message =
              ERR_IMPORT_LDIF_LACK_MEM.get(availMem,
                 ((2 * indexCount) * MIN_BUFFER_SIZE) + 2 * MIN_DB_CACHE_SIZE);
        throw new InitializationException(message);
      }
    }
    else
    {
      bufMem = getTmpEnvironmentMemory(availMem);
    }
    return bufMem;
  }
  private long skipDNValidationCalc(long availMem)
          throws InitializationException
  {
    long bufMem = availMem;
    if(availMem < (MIN_DB_CACHE_MEMORY))
    {
      long minCacheSize = MIN_DB_CACHE_SIZE;
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) {
        minCacheSize = 500 *KB;
      }
      dbCacheSize = minCacheSize;
      dbLogBufSize = 0;
      bufMem = availMem - minCacheSize;
      if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) {
        Message message =
              ERR_IMPORT_LDIF_LACK_MEM.get(availMem,
                  ((2 * indexCount) * MIN_BUFFER_SIZE) + MIN_DB_CACHE_SIZE);
        throw new InitializationException(message);
      }
    }
    return bufMem;
  }
  /**
   * Calculate buffer sizes and initialize JEB properties based on memory.
   *
   * @param envConfig The environment config to use in the calculations.
   *
   * @throws InitializationException If a problem occurs during calculation.
   * @param envConfig
   *          The environment config to use in the calculations.
   * @throws InitializationException
   *           If a problem occurs during calculation.
   */
  private void initializeDBEnv(EnvironmentConfig envConfig)
          throws InitializationException
      throws InitializationException
  {
      Message message;
      phaseOneBufferCount = 2 * (indexCount * threadCount);
      Runtime runTime = Runtime.getRuntime();
      long totFreeMemory = runTime.freeMemory() +
                            (runTime.maxMemory() - runTime.totalMemory());
      int importMemPct = (100 - JVM_MEM_PCT);
      if(totFreeMemory <= SMALL_HEAP_SIZE)
    // Calculate amount of usable memory. This will need to take into account
    // various fudge factors, including the number of IO buffers used by the
    // scratch writers (1 per index).
    final long availableMemory = calculateAvailableMemory();
    final long usableMemory = availableMemory
        - (indexCount * READER_WRITER_BUFFER_SIZE);
    if (!skipDNValidation)
    {
      // No DN validation: calculate memory for DB cache, DN2ID temporary cache,
      // and buffers.
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
      {
        importMemPct -= 15;
        dbCacheSize = 500 * KB;
        tmpEnvCacheSize = 500 * KB;
      }
      if(rebuildManager != null)
      else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
      {
        importMemPct -= 15;
        dbCacheSize = MIN_DB_CACHE_SIZE;
        tmpEnvCacheSize = MIN_DB_CACHE_SIZE;
      }
      long phaseOneBufferMemory;
      if(!skipDNValidation)
      else if (!clearedBackend)
      {
        phaseOneBufferMemory =
                       defaultMemoryCalc((totFreeMemory * importMemPct) / 100);
        // Appending to existing data so reserve extra memory for the DB cache
        // since it will be needed for dn2id queries.
        dbCacheSize = usableMemory * 33 / 100;
        tmpEnvCacheSize = usableMemory * 33 / 100;
      }
      else
      {
        phaseOneBufferMemory =
                     skipDNValidationCalc((totFreeMemory * importMemPct) / 100);
        dbCacheSize = MAX_DB_CACHE_SIZE;
        tmpEnvCacheSize = usableMemory * 66 / 100;
      }
      boolean maxBuffers = getBufferSizes(phaseOneBufferMemory);
      //Give any extra memory to the temp environment cache if there is any.
      if(!skipDNValidation && maxBuffers)
    }
    else
    {
      // No DN validation: calculate memory for DB cache and buffers.
      // No need for DN2ID cache.
      tmpEnvCacheSize = 0;
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
      {
        adjustTmpEnvironmentMemory(phaseOneBufferMemory);
        dbCacheSize = 500 * KB;
      }
      message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(phaseOneBufferMemory,
              phaseOneBufferCount);
      else if (usableMemory < MIN_DB_CACHE_MEMORY)
      {
        dbCacheSize = MIN_DB_CACHE_SIZE;
      }
      else
      {
        // No need to differentiate between append/clear backend, since dn2id is
        // not being queried.
        dbCacheSize = MAX_DB_CACHE_SIZE;
      }
    }
    final long phaseOneBufferMemory = usableMemory - dbCacheSize
        - tmpEnvCacheSize;
    final int oldThreadCount = threadCount;
    while (true)
    {
      phaseOneBufferCount = 2 * indexCount * threadCount;
      // Scratch writers allocate 4 buffers per index as well.
      final int totalPhaseOneBufferCount = phaseOneBufferCount
          + (4 * indexCount);
      bufferSize = (int) (phaseOneBufferMemory / totalPhaseOneBufferCount);
      if (bufferSize > MAX_BUFFER_SIZE)
      {
        if (!skipDNValidation)
        {
          // The buffers are big enough: the memory is best used for the DN2ID
          // temp DB.
          bufferSize = MAX_BUFFER_SIZE;
          final long extraMemory = phaseOneBufferMemory
              - (totalPhaseOneBufferCount * bufferSize);
          if (!clearedBackend)
          {
            dbCacheSize += extraMemory / 2;
            tmpEnvCacheSize += extraMemory / 2;
          }
          else
          {
            tmpEnvCacheSize += extraMemory;
          }
        }
        break;
      }
      else if (bufferSize > MIN_BUFFER_SIZE)
      {
        // This is acceptable.
        break;
      }
      else if (threadCount > 1)
      {
        // Retry using less threads.
        threadCount--;
      }
      else
      {
        // Not enough memory.
        final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount
            * MIN_BUFFER_SIZE;
        Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
            minimumPhaseOneBufferMemory + dbCacheSize + tmpEnvCacheSize);
        throw new InitializationException(message);
      }
    }
    if (oldThreadCount != threadCount)
    {
      Message message = NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount,
          threadCount);
      logError(message);
      if(tmpEnvCacheSize > 0)
      {
         message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize);
        logError(message);
      }
      envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
      envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY,
              Long.toString(dbCacheSize));
      message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize,
              bufferSize);
    }
    Message message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(
        phaseOneBufferMemory, phaseOneBufferCount);
    logError(message);
    if (tmpEnvCacheSize > 0)
    {
      message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize);
      logError(message);
      if(dbLogBufSize > 0)
    }
    envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY, Long
        .toString(dbCacheSize));
    message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize);
    logError(message);
  }
  /**
   * Returns the amount of available memory which can be used by this import,
   * taking into account whether or not the import is running offline or online
   * as a task.
   */
  private long calculateAvailableMemory()
  {
    final long availableMemory;
    if (DirectoryServer.isRunning())
    {
      // Online import/rebuild.
      Runtime runTime = Runtime.getRuntime();
      runTime.gc();
      runTime.gc();
      final long usedMemory = runTime.totalMemory() - runTime.freeMemory();
      final long maxUsableMemory = Platform.getUsableMemoryForCaching();
      final long usableMemory = maxUsableMemory - usedMemory;
      final long configuredMemory;
      if (backendConfiguration.getDBCacheSize() > 0)
      {
        envConfig.setConfigParam(EnvironmentConfig.LOG_TOTAL_BUFFER_BYTES,
                Long.toString(MAX_DB_LOG_SIZE));
        message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(MAX_DB_LOG_SIZE);
        logError(message);
        configuredMemory = backendConfiguration.getDBCacheSize();
      }
      else
      {
        configuredMemory = backendConfiguration.getDBCachePercent()
            * Runtime.getRuntime().maxMemory() / 100;
      }
      availableMemory = Math.min(usableMemory, configuredMemory);
    }
    else
    {
      // Offline import/rebuild.
      availableMemory = Platform.getUsableMemoryForCaching();
    }
    // Now take into account various fudge factors.
    int importMemPct = 90;
    if (availableMemory <= SMALL_HEAP_SIZE)
    {
      // Be pessimistic when memory is low.
      importMemPct -= 25;
    }
    if (rebuildManager != null)
    {
      // Rebuild seems to require more overhead.
      importMemPct -= 15;
    }
    return (availableMemory * importMemPct / 100);
  }
@@ -566,7 +592,7 @@
  {
    for(int i = 0; i < phaseOneBufferCount; i++)
    {
      IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize);
      IndexOutputBuffer b = new IndexOutputBuffer(bufferSize);
      freeBufferQueue.add(b);
    }
  }
@@ -802,10 +828,10 @@
    this.rootContainer = rootContainer;
    try
    {
    reader = new LDIFReader(importConfiguration, rootContainer,
                                 READER_WRITER_BUFFER_SIZE);
      reader = new LDIFReader(importConfiguration, rootContainer,
          READER_WRITER_BUFFER_SIZE);
    }
    catch(IOException ioe)
    catch (IOException ioe)
    {
      Message message = ERR_JEB_IMPORT_LDIF_READER_IO_ERROR.get();
      throw new InitializationException(message, ioe);
@@ -813,9 +839,8 @@
    try
    {
      Message message =
              NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
                      BUILD_ID, REVISION_NUMBER);
      Message message = NOTE_JEB_IMPORT_STARTING.get(
          DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
      logError(message);
      message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount);
      logError(message);
@@ -823,11 +848,11 @@
      long startTime = System.currentTimeMillis();
      phaseOne();
      long phaseOneFinishTime = System.currentTimeMillis();
      if(!skipDNValidation)
      if (!skipDNValidation)
      {
         tmpEnv.shutdown();
        tmpEnv.shutdown();
      }
      if(isPhaseOneCanceled)
      if (isPhaseOneCanceled)
      {
        throw new InterruptedException("Import processing canceled.");
      }
@@ -840,24 +865,22 @@
      long finishTime = System.currentTimeMillis();
      long importTime = (finishTime - startTime);
      float rate = 0;
      message = NOTE_JEB_IMPORT_PHASE_STATS.get(importTime/1000,
                        (phaseOneFinishTime - startTime)/1000,
                        (phaseTwoFinishTime - phaseTwoTime)/1000);
      message = NOTE_JEB_IMPORT_PHASE_STATS.get(importTime / 1000,
          (phaseOneFinishTime - startTime) / 1000,
          (phaseTwoFinishTime - phaseTwoTime) / 1000);
      logError(message);
      if (importTime > 0)
        rate = 1000f * reader.getEntriesRead() / importTime;
        message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
                  importCount.get(), reader.getEntriesIgnored(),
                  reader.getEntriesRejected(), migratedCount,
                  importTime / 1000, rate);
      if (importTime > 0) rate = 1000f * reader.getEntriesRead() / importTime;
      message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
          importCount.get(), reader.getEntriesIgnored(),
          reader.getEntriesRejected(), migratedCount, importTime / 1000, rate);
      logError(message);
    }
    finally
    {
      reader.close();
    }
    return new LDIFImportResult(reader.getEntriesRead(), reader
            .getEntriesRejected(), reader.getEntriesIgnored());
    return new LDIFImportResult(reader.getEntriesRead(),
        reader.getEntriesRejected(), reader.getEntriesIgnored());
  }
@@ -921,31 +944,34 @@
  }
  private void phaseOne() throws InterruptedException, ExecutionException
  {
    initializeIndexBuffers();
    FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
    ScheduledThreadPoolExecutor timerService =
      new ScheduledThreadPoolExecutor(1);
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(
        1);
    timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL,
      TIMER_INTERVAL, TimeUnit.MILLISECONDS);
        TIMER_INTERVAL, TimeUnit.MILLISECONDS);
    scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
    bufferSortService = Executors.newFixedThreadPool(threadCount);
    ExecutorService execService = Executors.newFixedThreadPool(threadCount);
    List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    tasks.add(new MigrateExistingTask());
    List<Future<Void>> results = execService.invokeAll(tasks);
    for (Future<Void> result : results) {
      if(!result.isDone()) {
    for (Future<Void> result : results)
    {
      if (!result.isDone())
      {
        result.get();
      }
    }
    tasks.clear();
    results.clear();
    if (importConfiguration.appendToExistingData() &&
            importConfiguration.replaceExistingEntries())
    if (importConfiguration.appendToExistingData()
        && importConfiguration.replaceExistingEntries())
    {
     for (int i = 0; i < threadCount; i++)
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new AppendReplaceTask());
      }
@@ -959,24 +985,32 @@
    }
    results = execService.invokeAll(tasks);
    for (Future<Void> result : results)
      if(!result.isDone()) {
    {
      if (!result.isDone())
      {
        result.get();
      }
    }
    tasks.clear();
    results.clear();
    tasks.add(new MigrateExcludedTask());
    results = execService.invokeAll(tasks);
    for (Future<Void> result : results)
      if(!result.isDone()) {
        result.get();
      }
    stopScratchFileWriters();
    for (Future<?> result : scratchFileWriterFutures)
    {
     if(!result.isDone()) {
      if (!result.isDone())
      {
        result.get();
      }
    }
    stopScratchFileWriters();
    for (Future<?> result : scratchFileWriterFutures)
    {
      if (!result.isDone())
      {
        result.get();
      }
    }
    // Shutdown the executor services
    timerService.shutdown();
    timerService.awaitTermination(30, TimeUnit.SECONDS);
@@ -987,7 +1021,7 @@
    scratchFileWriterService.shutdown();
    scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
    //Try to clear as much memory as possible.
    // Try to clear as much memory as possible.
    scratchFileWriterList.clear();
    scratchFileWriterFutures.clear();
    indexKeyQueMap.clear();
@@ -996,41 +1030,31 @@
  private void phaseTwo() throws InterruptedException, JebException,
          ExecutionException
  private void phaseTwo() throws InitializationException, InterruptedException,
      JebException, ExecutionException
  {
    SecondPhaseProgressTask progress2Task =
            new SecondPhaseProgressTask(reader.getEntriesRead());
    ScheduledThreadPoolExecutor timerService =
      new ScheduledThreadPoolExecutor(1);
    SecondPhaseProgressTask progress2Task = new SecondPhaseProgressTask(
        reader.getEntriesRead());
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(
        1);
    timerService.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL,
      TIMER_INTERVAL, TimeUnit.MILLISECONDS);
    processIndexFiles();
    timerService.shutdown();
    timerService.awaitTermination(30, TimeUnit.SECONDS);
  }
  private int getBufferCount(int dbThreads)
  {
    int buffers = 0;
    List<IndexManager> totList = new LinkedList<IndexManager>(DNIndexMgrList);
    totList.addAll(indexMgrList);
    Collections.sort(totList, Collections.reverseOrder());
    int limit = Math.min(dbThreads, totList.size());
    for(int i = 0; i < limit; i ++)
        TIMER_INTERVAL, TimeUnit.MILLISECONDS);
    try
    {
      buffers += totList.get(i).getBufferList().size();
      processIndexFiles();
    }
    return buffers;
    finally
    {
      timerService.shutdown();
      timerService.awaitTermination(30, TimeUnit.SECONDS);
    }
  }
  private void processIndexFiles() throws InterruptedException,
          JebException, ExecutionException
  private void processIndexFiles() throws InitializationException,
      InterruptedException, JebException, ExecutionException
  {
    ExecutorService dbService;
    if(bufferCount.get() == 0)
    {
      return;
@@ -1040,60 +1064,89 @@
    {
      dbThreads = 4;
    }
    int readAheadSize =  cacheSizeFromFreeMemory(getBufferCount(dbThreads));
    // Calculate memory / buffer counts.
    final long availableMemory = calculateAvailableMemory();
    final long usableMemory = availableMemory - dbCacheSize;
    int readAheadSize;
    int buffers;
    while (true)
    {
      final List<IndexManager> totList = new ArrayList<IndexManager>(
          DNIndexMgrList);
      totList.addAll(indexMgrList);
      Collections.sort(totList, Collections.reverseOrder());
      buffers = 0;
      final int limit = Math.min(dbThreads, totList.size());
      for (int i = 0; i < limit; i++)
      {
        buffers += totList.get(i).bufferIndexCount;
      }
      readAheadSize = (int) (usableMemory / buffers);
      if (readAheadSize > bufferSize)
      {
        // Cache size is never larger than the buffer size.
        readAheadSize = bufferSize;
        break;
      }
      else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
      {
        // This is acceptable.
        break;
      }
      else if (dbThreads > 1)
      {
        // Reduce thread count.
        dbThreads--;
      }
      else
      {
        // Not enough memory.
        final long minimumPhaseTwoBufferMemory = buffers
            * MIN_READ_AHEAD_CACHE_SIZE;
        Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
            minimumPhaseTwoBufferMemory + dbCacheSize);
        throw new InitializationException(message);
      }
    }
    Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(
        availableMemory, readAheadSize, buffers);
    logError(message);
    // Start indexing tasks.
    List<Future<Void>> futures = new LinkedList<Future<Void>>();
    dbService = Executors.newFixedThreadPool(dbThreads);
    //Start DN processing first.
    for(IndexManager dnMgr : DNIndexMgrList)
    ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
    // Start DN processing first.
    for (IndexManager dnMgr : DNIndexMgrList)
    {
      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize)));
    }
    for(IndexManager mgr : indexMgrList)
    for (IndexManager mgr : indexMgrList)
    {
       futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
    }
    for (Future<Void> result : futures)
      if(!result.isDone()) {
    {
      if (!result.isDone())
      {
        result.get();
      }
    }
    dbService.shutdown();
  }
  private int cacheSizeFromFreeMemory(int buffers)
  {
    Runtime runTime = Runtime.getRuntime();
    runTime.gc();
    runTime.gc();
    long freeMemory = runTime.freeMemory();
    long maxMemory = runTime.maxMemory();
    long totMemory = runTime.totalMemory();
    long totFreeMemory = (freeMemory + (maxMemory - totMemory));
    int importMemPct = (100 - JVM_MEM_PCT);
    //For very small heaps, give more memory to the JVM.
    if(totFreeMemory <= SMALL_HEAP_SIZE)
    {
        importMemPct -= 35;
    }
    long availableMemory = (totFreeMemory * importMemPct) / 100;
    int averageBufferSize = (int)(availableMemory /buffers);
    int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, averageBufferSize);
    //Cache size is never larger than the buffer size.
    if(cacheSize > bufferSize)
    {
      cacheSize = bufferSize;
    }
    Message message =
     NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(availableMemory,
                                                   cacheSize, buffers);
    logError(message);
    return cacheSize;
  }
  private void stopScratchFileWriters()
  {
    IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
    IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
    for(ScratchFileWriterTask task : scratchFileWriterList)
    {
      task.queue.add(indexBuffer);
@@ -1293,7 +1346,7 @@
        {
          if (importConfiguration.isCancelled() || isPhaseOneCanceled)
          {
            IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
            IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
            freeBufferQueue.add(indexBuffer);
            return null;
          }
@@ -1462,8 +1515,8 @@
   */
  private  class ImportTask implements Callable<Void>
  {
    private final Map<IndexKey, IndexBuffer> indexBufferMap =
                                     new HashMap<IndexKey, IndexBuffer>();
    private final Map<IndexKey, IndexOutputBuffer> indexBufferMap =
                                     new HashMap<IndexKey, IndexOutputBuffer>();
    private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
    private final EntryInformation entryInfo = new EntryInformation();
    private DatabaseEntry keyEntry = new DatabaseEntry(),
@@ -1481,7 +1534,7 @@
        {
          if (importConfiguration.isCancelled() || isPhaseOneCanceled)
          {
            IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
            IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
            freeBufferQueue.add(indexBuffer);
            return null;
          }
@@ -1653,15 +1706,16 @@
    void flushIndexBuffers() throws InterruptedException,
                 ExecutionException
    {
       Set<Map.Entry<IndexKey, IndexBuffer>> set = indexBufferMap.entrySet();
       Iterator<Map.Entry<IndexKey, IndexBuffer>> setIterator = set.iterator();
       Set<Map.Entry<IndexKey, IndexOutputBuffer>> set =
         indexBufferMap.entrySet();
       Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator =
         set.iterator();
       while(setIterator.hasNext())
        {
          Map.Entry<IndexKey, IndexBuffer> e = setIterator.next();
          Map.Entry<IndexKey, IndexOutputBuffer> e = setIterator.next();
          IndexKey indexKey = e.getKey();
          IndexBuffer indexBuffer = e.getValue();
          IndexOutputBuffer indexBuffer = e.getValue();
          setIterator.remove();
          ImportIndexType indexType = indexKey.getIndexType();
          indexBuffer.setComparator(indexComparator);
          indexBuffer.setIndexKey(indexKey);
          indexBuffer.setDiscard();
@@ -1674,27 +1728,22 @@
    int
    processKey(DatabaseContainer container, byte[] key, EntryID entryID,
         IndexBuffer.ComparatorBuffer<byte[]> comparator, IndexKey indexKey,
         boolean insert)
         IndexOutputBuffer.ComparatorBuffer<byte[]> comparator,
         IndexKey indexKey, boolean insert)
         throws ConfigException, InterruptedException
    {
      IndexBuffer indexBuffer;
      if(!indexBufferMap.containsKey(indexKey))
      IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
      if (indexBuffer == null)
      {
        indexBuffer = getNewIndexBuffer();
        indexBufferMap.put(indexKey, indexBuffer);
      }
      else
      {
        indexBuffer = indexBufferMap.get(indexKey);
      }
      if(!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
      else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
      {
        indexBuffer.setComparator(comparator);
        indexBuffer.setIndexKey(indexKey);
        bufferSortService.submit(new SortTask(indexBuffer));
        indexBuffer = getNewIndexBuffer();
        indexBufferMap.remove(indexKey);
        indexBufferMap.put(indexKey, indexBuffer);
      }
      int id = System.identityHashCode(container);
@@ -1703,9 +1752,10 @@
    }
    IndexBuffer getNewIndexBuffer() throws ConfigException, InterruptedException
    IndexOutputBuffer getNewIndexBuffer() throws ConfigException,
      InterruptedException
    {
      IndexBuffer indexBuffer = freeBufferQueue.take();
      IndexOutputBuffer indexBuffer = freeBufferQueue.take();
        if(indexBuffer == null)
        {
         Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
@@ -1774,15 +1824,25 @@
    }
    private SortedSet<Buffer> initializeBuffers() throws IOException
    private NavigableSet<IndexInputBuffer> initializeBuffers()
      throws IOException
    {
      SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
      for(Buffer b : indexMgr.getBufferList())
      NavigableSet<IndexInputBuffer> bufferSet =
        new TreeSet<IndexInputBuffer>();
      for (int i = 0; i < indexMgr.bufferIndexCount; i++)
      {
        b.initializeCache(indexMgr, null, cacheSize);
        IndexInputBuffer b = new IndexInputBuffer(indexMgr,
            indexMgr.bufferIndexBegin[i], indexMgr.bufferIndexEnd[i],
            indexMgr.bufferIndexID[i]);
        b.initializeCache(cacheSize);
        bufferSet.add(b);
      }
      indexMgr.getBufferList().clear();
      // GC arrays.
      indexMgr.bufferIndexBegin = null;
      indexMgr.bufferIndexEnd = null;
      indexMgr.bufferIndexID = null;
      return bufferSet;
    }
@@ -1792,78 +1852,103 @@
     */
    public Void call() throws Exception
    {
      ByteBuffer cKey = null;
      ImportIDSet cInsertIDSet =  new ImportIDSet(),
                  cDeleteIDSet =  new ImportIDSet();
      Thread.setDefaultUncaughtExceptionHandler(
             new DefaultExceptionHandler());
      indexMgr.setStarted();
      Message message =
              NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(),
                         indexMgr.getBufferList().size());
                         indexMgr.bufferIndexCount);
      logError(message);
      Integer cIndexID = null;
      ByteBuffer key = null;
      ImportIDSet insertIDSet = null;
      ImportIDSet deleteIDSet = null;
      Integer indexID = null;
      try
      {
        indexMgr.openIndexFile();
        SortedSet<Buffer> bufferSet = initializeBuffers();
        while(!bufferSet.isEmpty())
        NavigableSet<IndexInputBuffer> bufferSet = initializeBuffers();
        while (!bufferSet.isEmpty())
        {
          Buffer b;
          b = bufferSet.first();
          bufferSet.remove(b);
          if(cKey == null)
          IndexInputBuffer b = bufferSet.pollFirst();
          if (key == null)
          {
            cKey = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
            cIndexID =  b.getIndexID();
            cKey.clear();
            if(b.getKeyLen() > cKey.capacity())
            indexID = b.getIndexID();
            if (indexMgr.isDN2ID())
            {
              cKey = ByteBuffer.allocate(b.getKeyLen());
            }
            cKey.flip();
            b.getKey(cKey);
            cInsertIDSet.merge(b.getInsertIDSet());
            cDeleteIDSet.merge(b.getDeleteIDSet());
            cInsertIDSet.setKey(cKey);
            cDeleteIDSet.setKey(cKey);
          }
          else
          {
            if(b.compare(cKey, cIndexID) != 0)
            {
              addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
              indexMgr.incrementKeyCount();
              cIndexID =  b.getIndexID();
              cKey.clear();
              if(b.getKeyLen() > cKey.capacity())
              {
                 cKey = ByteBuffer.allocate(b.getKeyLen());
              }
              cKey.flip();
              b.getKey(cKey);
              cInsertIDSet.clear(true);
              cDeleteIDSet.clear(true);
              cInsertIDSet.merge(b.getInsertIDSet());
              cDeleteIDSet.merge(b.getDeleteIDSet());
              cInsertIDSet.setKey(cKey);
              cDeleteIDSet.setKey(cKey);
              insertIDSet = new ImportIDSet(1, 1, false);
              deleteIDSet = new ImportIDSet(1, 1, false);
            }
            else
            {
              cInsertIDSet.merge(b.getInsertIDSet());
              cDeleteIDSet.merge(b.getDeleteIDSet());
              Index index = (Index) idContainerMap.get(indexID);
              int limit = index.getIndexEntryLimit();
              boolean doCount = index.getMaintainCount();
              insertIDSet = new ImportIDSet(1, limit, doCount);
              deleteIDSet = new ImportIDSet(1, limit, doCount);
            }
            key = ByteBuffer.allocate(b.getKeyLen());
            key.flip();
            b.getKey(key);
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
            insertIDSet.setKey(key);
            deleteIDSet.setKey(key);
          }
          else if (b.compare(key, indexID) != 0)
          {
            addToDB(insertIDSet, deleteIDSet, indexID);
            indexMgr.incrementKeyCount();
            indexID = b.getIndexID();
            if (indexMgr.isDN2ID())
            {
              insertIDSet = new ImportIDSet(1, 1, false);
              deleteIDSet = new ImportIDSet(1, 1, false);
            }
            else
            {
              Index index = (Index) idContainerMap.get(indexID);
              int limit = index.getIndexEntryLimit();
              boolean doCount = index.getMaintainCount();
              insertIDSet = new ImportIDSet(1, limit, doCount);
              deleteIDSet = new ImportIDSet(1, limit, doCount);
            }
            key.clear();
            if (b.getKeyLen() > key.capacity())
            {
              key = ByteBuffer.allocate(b.getKeyLen());
            }
            key.flip();
            b.getKey(key);
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
            insertIDSet.setKey(key);
            deleteIDSet.setKey(key);
          }
          else
          {
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
          }
          if(b.hasMoreData())
          {
            b.getNextRecord();
            bufferSet.add(b);
          }
        }
        if(cKey != null)
        if(key != null)
        {
          addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
          addToDB(insertIDSet, deleteIDSet, indexID);
        }
      }
      catch (Exception e)
@@ -1986,7 +2071,7 @@
      private ByteBuffer parentDN, lastDN;
      private EntryID parentID, lastID, entryID;
      private final DatabaseEntry DNKey, DNValue;
      private final DatabaseEntry dnKey, dnValue;
      private final TreeMap<ByteBuffer, EntryID> parentIDMap;
      private final EntryContainer entryContainer;
      private final Map<byte[], ImportIDSet> id2childTree;
@@ -2009,8 +2094,8 @@
        subTreeLimit = entryContainer.getID2Subtree().getIndexEntryLimit();
        subTreeDoCount = entryContainer.getID2Subtree().getMaintainCount();
        id2subtreeTree =  new TreeMap<byte[], ImportIDSet>(subComparator);
        DNKey = new DatabaseEntry();
        DNValue = new DatabaseEntry();
        dnKey = new DatabaseEntry();
        dnValue = new DatabaseEntry();
        lastDN = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
      }
@@ -2052,10 +2137,10 @@
      private boolean checkParent(ImportIDSet record) throws DirectoryException,
              DatabaseException
      {
        DNKey.setData(record.getKey().array(), 0 , record.getKey().limit());
        dnKey.setData(record.getKey().array(), 0 , record.getKey().limit());
        byte[] v = record.toDatabase();
        long v1 = JebFormat.entryIDFromDatabase(v);
        DNValue.setData(v);
        dnValue.setData(v);
        entryID = new EntryID(v1);
        parentDN = getParent(record.getKey());
@@ -2068,7 +2153,8 @@
          //If null is returned than this is a suffix DN.
          if(parentDN != null)
          {
            DatabaseEntry key = new DatabaseEntry(parentDN.array());
            DatabaseEntry key =
              new DatabaseEntry(parentDN.array(), 0 , parentDN.limit());
            DatabaseEntry value = new DatabaseEntry();
            OperationStatus status;
            status =
@@ -2166,7 +2252,7 @@
        if (importConfiguration != null &&
            importConfiguration.appendToExistingData())
        {
            DatabaseEntry key = new DatabaseEntry(dn.array());
            DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit());
            DatabaseEntry value = new DatabaseEntry();
            OperationStatus status;
            status =
@@ -2235,7 +2321,7 @@
      public void writeToDB() throws DatabaseException, DirectoryException
      {
        entryContainer.getDN2ID().put(null, DNKey, DNValue);
        entryContainer.getDN2ID().put(null, dnKey, dnValue);
        indexMgr.addTotDNCount(1);
        if(parentDN != null)
        {
@@ -2253,8 +2339,8 @@
        {
          byte[] key = e.getKey();
          ImportIDSet idSet = e.getValue();
          DNKey.setData(key);
          index.insert(DNKey, idSet, DNValue);
          dnKey.setData(key);
          index.insert(dnKey, idSet, dnValue);
        }
        index.closeCursor();
        if(clearMap)
@@ -2281,7 +2367,7 @@
  {
    private final int DRAIN_TO = 3;
    private final IndexManager indexMgr;
    private final BlockingQueue<IndexBuffer> queue;
    private final BlockingQueue<IndexOutputBuffer> queue;
    private final ByteArrayOutputStream insetByteStream =
            new ByteArrayOutputStream(2 * bufferSize);
    private final ByteArrayOutputStream deleteByteStream =
@@ -2289,13 +2375,13 @@
    private final byte[] tmpArray = new byte[8];
    private int insertKeyCount = 0, deleteKeyCount = 0;
    private final DataOutputStream dataStream;
    private long bufferCount = 0;
    private int bufferCount = 0;
    private final File file;
    private final SortedSet<IndexBuffer> indexSortedSet;
    private final SortedSet<IndexOutputBuffer> indexSortedSet;
    private boolean poisonSeen = false;
    public ScratchFileWriterTask(BlockingQueue<IndexBuffer> queue,
    public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
                             IndexManager indexMgr) throws FileNotFoundException
    {
      this.queue = queue;
@@ -2305,7 +2391,7 @@
              new BufferedOutputStream(new FileOutputStream(file),
                                       READER_WRITER_BUFFER_SIZE);
      dataStream = new DataOutputStream(bufferedStream);
      indexSortedSet = new TreeSet<IndexBuffer>();
      indexSortedSet = new TreeSet<IndexOutputBuffer>();
    }
@@ -2315,11 +2401,11 @@
    public Void call() throws IOException
    {
      long offset = 0;
      List<IndexBuffer> l = new LinkedList<IndexBuffer>();
      List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
      try {
        while(true)
        {
          IndexBuffer indexBuffer = queue.poll();
          IndexOutputBuffer indexBuffer = queue.poll();
          if(indexBuffer != null)
          {
            long beginOffset = offset;
@@ -2329,7 +2415,7 @@
              queue.drainTo(l, DRAIN_TO);
              l.add(indexBuffer);
              bufferLen = writeIndexBuffers(l);
              for(IndexBuffer id : l)
              for(IndexOutputBuffer id : l)
              {
                if(!id.isDiscard())
                {
@@ -2353,7 +2439,7 @@
              }
            }
            offset += bufferLen;
            indexMgr.addBuffer(new Buffer(beginOffset, offset, bufferCount));
            indexMgr.addBuffer(beginOffset, offset, bufferCount);
            bufferCount++;
            Importer.this.bufferCount.incrementAndGet();
            if(poisonSeen)
@@ -2381,7 +2467,8 @@
    }
    private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException
    private long writeIndexBuffer(IndexOutputBuffer indexBuffer)
      throws IOException
    {
      int numberKeys = indexBuffer.getNumberKeys();
      indexBuffer.setPosition(-1);
@@ -2433,14 +2520,14 @@
    }
    private long writeIndexBuffers(List<IndexBuffer> buffers)
    private long writeIndexBuffers(List<IndexOutputBuffer> buffers)
            throws IOException
    {
      long id = 0;
      long bufferLen = 0;
      insetByteStream.reset(); insertKeyCount = 0;
      deleteByteStream.reset(); deleteKeyCount = 0;
      for(IndexBuffer b : buffers)
      for(IndexOutputBuffer b : buffers)
      {
        if(b.isPoison())
        {
@@ -2457,7 +2544,7 @@
      int saveIndexID = 0;
      while(!indexSortedSet.isEmpty())
      {
        IndexBuffer b = indexSortedSet.first();
        IndexOutputBuffer b = indexSortedSet.first();
        indexSortedSet.remove(b);
        if(saveKey == null)
        {
@@ -2563,7 +2650,7 @@
    }
    private int writeRecord(IndexBuffer b) throws IOException
    private int writeRecord(IndexOutputBuffer b) throws IOException
    {
      int keySize = b.getKeySize();
      int packedSize = writeHeader(b.getIndexID(), keySize);
@@ -2595,9 +2682,9 @@
  private final class SortTask implements Callable<Void>
  {
    private final IndexBuffer indexBuffer;
    private final IndexOutputBuffer indexBuffer;
    public SortTask(IndexBuffer indexBuffer)
    public SortTask(IndexOutputBuffer indexBuffer)
    {
      this.indexBuffer = indexBuffer;
    }
@@ -2607,23 +2694,24 @@
     */
    public Void call() throws Exception
    {
      if (importConfiguration != null &&
          importConfiguration.isCancelled() || isPhaseOneCanceled)
      if (importConfiguration != null && importConfiguration.isCancelled()
          || isPhaseOneCanceled)
      {
        isPhaseOneCanceled =true;
        isPhaseOneCanceled = true;
        return null;
      }
      indexBuffer.sort();
      if(indexKeyQueMap.containsKey(indexBuffer.getIndexKey())) {
        BlockingQueue<IndexBuffer> q =
                indexKeyQueMap.get(indexBuffer.getIndexKey());
      if (indexKeyQueMap.containsKey(indexBuffer.getIndexKey()))
      {
        BlockingQueue<IndexOutputBuffer> q = indexKeyQueMap.get(indexBuffer
            .getIndexKey());
        q.add(indexBuffer);
      }
      else
      {
        createIndexWriterTask(indexBuffer.getIndexKey());
        BlockingQueue<IndexBuffer> q =
                                 indexKeyQueMap.get(indexBuffer.getIndexKey());
        BlockingQueue<IndexOutputBuffer> q = indexKeyQueMap.get(indexBuffer
            .getIndexKey());
        q.add(indexBuffer);
      }
      return null;
@@ -2653,8 +2741,8 @@
        {
          indexMgrList.add(indexMgr);
        }
        BlockingQueue<IndexBuffer> newQue =
                new ArrayBlockingQueue<IndexBuffer>(phaseOneBufferCount);
        BlockingQueue<IndexOutputBuffer> newQue =
                new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
        ScratchFileWriterTask indexWriter =
                new ScratchFileWriterTask(newQue, indexMgr);
        scratchFileWriterList.add(indexWriter);
@@ -2666,327 +2754,6 @@
  }
  /**
   * The buffer class is used to process a buffer from the temporary index files
   * during phase 2 processing.
   */
  private final class Buffer implements Comparable<Buffer>
  {
    private IndexManager indexMgr;
    private final long begin, end, id;
    private long offset;
    private ByteBuffer cache;
    private int limit;
    private ImportIDSet insertIDSet = null, deleteIDSet = null;
    private Integer indexID = null;
    private boolean doCount;
    private ByteBuffer keyBuf = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
    public Buffer(long begin, long end, long id)
    {
      this.begin = begin;
      this.end = end;
      this.offset = 0;
      this.id = id;
    }
    private void initializeCache(IndexManager indexMgr, ByteBuffer b,
                      long cacheSize) throws IOException
    {
      this.indexMgr = indexMgr;
      if(b == null)
      {
        cache = ByteBuffer.allocate((int)cacheSize);
      }
      else
      {
        cache = b;
      }
      loadCache();
      cache.flip();
      keyBuf.flip();
    }
    private void loadCache() throws IOException
    {
      FileChannel fileChannel = indexMgr.getChannel();
      fileChannel.position(begin + offset);
      long leftToRead =  end - (begin + offset);
      long bytesToRead;
      if(leftToRead < cache.remaining())
      {
        cache.limit((int) (cache.position() + leftToRead));
        bytesToRead = (int)leftToRead;
      }
      else
      {
        bytesToRead = Math.min((end - offset),cache.remaining());
      }
      int bytesRead = 0;
      while(bytesRead < bytesToRead)
      {
        bytesRead += fileChannel.read(cache);
      }
      offset += bytesRead;
      indexMgr.addBytesRead(bytesRead);
    }
      public boolean hasMoreData() throws IOException
      {
        boolean ret = ((begin + offset) >= end);
        return !(cache.remaining() == 0 && ret);
      }
    public int getKeyLen()
    {
      return keyBuf.limit();
    }
    public void getKey(ByteBuffer b)
    {
      keyBuf.get(b.array(), 0, keyBuf.limit());
      b.limit(keyBuf.limit());
    }
    ByteBuffer getKeyBuf()
    {
      return keyBuf;
    }
    public ImportIDSet getInsertIDSet()
    {
      return insertIDSet;
    }
    public ImportIDSet getDeleteIDSet()
    {
      return deleteIDSet;
    }
    public long getBufferID()
    {
      return id;
    }
    public Integer getIndexID()
    {
      if(indexID == null)
      {
        try {
          getNextRecord();
        } catch(IOException ex) {
           Message message =
               ERR_JEB_IMPORT_BUFFER_IO_ERROR.get(indexMgr.getFileName());
           logError(message);
           ex.printStackTrace();
           System.exit(1);
        }
      }
      return indexID;
    }
    public void getNextRecord()  throws IOException
    {
      getNextIndexID();
      getContainerParameters();
      getNextKey();
      getNextIDSet(true);  //get insert ids
      getNextIDSet(false); //get delete ids
    }
    private void getContainerParameters()
    {
      limit = 1;
      doCount = false;
      if(!indexMgr.isDN2ID())
      {
        Index index = (Index) idContainerMap.get(indexID);
        limit = index.getIndexEntryLimit();
        doCount = index.getMaintainCount();
        if(insertIDSet == null)
        {
          insertIDSet = new ImportIDSet(128, limit, doCount);
          deleteIDSet = new ImportIDSet(128, limit, doCount);
        }
      }
      else
      {
        if(insertIDSet == null)
        {
            insertIDSet = new ImportIDSet(1, limit, doCount);
            deleteIDSet = new ImportIDSet(1, limit, doCount);
        }
      }
    }
    private int getInt()  throws IOException
    {
      ensureData(4);
      return cache.getInt();
    }
    private void getNextIndexID() throws IOException, BufferUnderflowException
     {
       indexID = getInt();
     }
    private void getNextKey() throws IOException, BufferUnderflowException
    {
      ensureData(20);
      byte[] ba = cache.array();
      int p = cache.position();
      int len = PackedInteger.getReadIntLength(ba, p);
      int keyLen = PackedInteger.readInt(ba, p);
      cache.position(p + len);
      if(keyLen > keyBuf.capacity())
      {
        keyBuf = ByteBuffer.allocate(keyLen);
      }
      ensureData(keyLen);
      keyBuf.clear();
      cache.get(keyBuf.array(), 0, keyLen);
      keyBuf.limit(keyLen);
    }
    private void getNextIDSet(boolean insert)
            throws IOException, BufferUnderflowException
    {
      ensureData(20);
      int p = cache.position();
      byte[] ba = cache.array();
      int len = PackedInteger.getReadIntLength(ba, p);
      int keyCount = PackedInteger.readInt(ba, p);
      p += len;
      cache.position(p);
      if(insert)
      {
        insertIDSet.clear(false);
      }
      else
      {
        deleteIDSet.clear(false);
      }
      for(int k = 0; k < keyCount; k++)
      {
        if(ensureData(9))
        {
          p = cache.position();
        }
        len = PackedInteger.getReadLongLength(ba, p);
        long l = PackedInteger.readLong(ba, p);
        p += len;
        cache.position(p);
        if(insert)
        {
          insertIDSet.addEntryID(l);
        }
        else
        {
          deleteIDSet.addEntryID(l);
        }
      }
    }
    private boolean ensureData(int len) throws IOException
    {
      boolean ret = false;
      if(cache.remaining() == 0)
      {
        cache.clear();
        loadCache();
        cache.flip();
        ret = true;
      }
      else if(cache.remaining() < len)
      {
        cache.compact();
        loadCache();
        cache.flip();
        ret = true;
      }
      return ret;
    }
    private int compare(ByteBuffer cKey, Integer cIndexID)
    {
      int returnCode, rc;
      if(keyBuf.limit() == 0)
      {
        getIndexID();
      }
      rc = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
                                     cKey.array(), cKey.limit());
      if(rc != 0) {
        returnCode = 1;
      }
      else
      {
        returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1;
      }
      return returnCode;
    }
    public int compareTo(Buffer o) {
      //used in remove.
      if(this.equals(o))
      {
        return 0;
      }
      if(keyBuf.limit() == 0) {
        getIndexID();
      }
      if(o.getKeyBuf().limit() == 0)
      {
        o.getIndexID();
      }
      int returnCode;
      byte[] oKey = o.getKeyBuf().array();
      int oLen = o.getKeyBuf().limit();
      returnCode = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
                                             oKey, oLen);
      if(returnCode == 0)
      {
        if(indexID.intValue() == o.getIndexID().intValue())
        {
          if(insertIDSet.isDefined())
          {
            returnCode = -1;
          }
          else if(o.getInsertIDSet().isDefined())
          {
            returnCode = 1;
          }
          else if(insertIDSet.size() == o.getInsertIDSet().size())
          {
            returnCode = id > o.getBufferID() ? 1 : -1;
          }
          else
          {
            returnCode = insertIDSet.size() - o.getInsertIDSet().size();
          }
        }
        else if(indexID > o.getIndexID())
        {
          returnCode = 1;
        }
        else
        {
          returnCode = -1;
        }
      }
      return returnCode;
    }
  }
  /**
   * The index manager class has several functions:
   *
   *   1. It used to carry information about index processing created in phase
@@ -2996,11 +2763,12 @@
   *
   *   3. It manages opening and closing the scratch index files.
   */
  private final class IndexManager implements Comparable<IndexManager>
  final class IndexManager implements Comparable<IndexManager>
  {
    private static final int BUFFER_SIZE = 128;
    private final File file;
    private RandomAccessFile rFile = null;
    private final List<Buffer> bufferList = new LinkedList<Buffer>();
    private long fileLength, bytesRead = 0;
    private boolean done = false, started = false;
    private long totalDNS;
@@ -3009,8 +2777,12 @@
    private final boolean isDN;
    private final int limit;
    private long[] bufferIndexBegin = new long[BUFFER_SIZE];
    private long[] bufferIndexEnd   = new long[BUFFER_SIZE];
    private int[]  bufferIndexID    = new int[BUFFER_SIZE];
    private int    bufferIndexCount = 0;
    IndexManager(String fileName, boolean isDN, int limit)
    private IndexManager(String fileName, boolean isDN, int limit)
    {
      file = new File(tempDir, fileName);
      this.fileName = fileName;
@@ -3019,91 +2791,110 @@
    }
    void openIndexFile() throws FileNotFoundException
    private void openIndexFile() throws FileNotFoundException
    {
      rFile = new RandomAccessFile(file, "r");
    }
    public FileChannel getChannel()
    /**
     * Returns the file channel associated with this index manager.
     *
     * @return The file channel associated with this index manager.
     */
    FileChannel getChannel()
    {
      return rFile.getChannel();
    }
    public void addBuffer(Buffer o)
    private void addBuffer(long begin, long end, int id)
    {
      this.bufferList.add(o);
      int size = bufferIndexBegin.length;
      if (bufferIndexCount >= size)
      {
        size += BUFFER_SIZE;
        bufferIndexBegin = Arrays.copyOf(bufferIndexBegin, size);
        bufferIndexEnd = Arrays.copyOf(bufferIndexEnd, size);
        bufferIndexID = Arrays.copyOf(bufferIndexID, size);
      }
      bufferIndexBegin[bufferIndexCount] = begin;
      bufferIndexEnd[bufferIndexCount] = end;
      bufferIndexID[bufferIndexCount] = id;
      bufferIndexCount++;
    }
    public List<Buffer> getBufferList()
    {
      return bufferList;
    }
    public File getFile()
    private File getFile()
    {
      return file;
    }
    public boolean deleteIndexFile()
    private boolean deleteIndexFile()
    {
      return file.delete();
    }
    public void close() throws IOException
    private void close() throws IOException
    {
      rFile.close();
    }
    public void setFileLength()
    private void setFileLength()
    {
      this.fileLength = file.length();
    }
    public void addBytesRead(int bytesRead)
    /**
     * Updates the bytes read counter.
     *
     * @param bytesRead
     *          The number of bytes read.
     */
    void addBytesRead(int bytesRead)
    {
      this.bytesRead += bytesRead;
    }
    public void setDone()
    private void setDone()
    {
      this.done = true;
    }
    public void setStarted()
    private void setStarted()
    {
      started = true;
    }
    public void addTotDNCount(int delta)
    private void addTotDNCount(int delta)
    {
      this.totalDNS += delta;
    }
    public long getDNCount()
    private long getDNCount()
    {
      return totalDNS;
    }
    public boolean isDN2ID()
    private boolean isDN2ID()
    {
      return isDN;
    }
    public void printStats(long deltaTime)
    private void printStats(long deltaTime)
    {
      if(!done && started)
      {
@@ -3115,38 +2906,35 @@
    }
    public void incrementKeyCount()
    private void incrementKeyCount()
    {
      keyCount.incrementAndGet();
    }
    public String getFileName()
    /**
     * Returns the file name associated with this index manager.
     *
     * @return The file name associated with this index manager.
     */
    String getFileName()
    {
      return fileName;
    }
    public int getLimit()
    private int getLimit()
    {
      return limit;
    }
    /**
     * {@inheritDoc}
     */
    public int compareTo(IndexManager mgr)
    {
      if(bufferList.size() == mgr.getBufferList().size())
      {
         return 0;
      }
      else if (bufferList.size() < mgr.getBufferList().size())
      {
        return -1;
      }
      else
      {
        return 1;
      }
      return bufferIndexCount - mgr.bufferIndexCount;
    }
  }
@@ -3154,7 +2942,7 @@
  /**
   * The rebuild index manager handles all rebuild index related processing.
   */
  class RebuildIndexManager extends ImportTask {
  private class RebuildIndexManager extends ImportTask {
   //Rebuild index configuration.
   private final RebuildConfig rebuildConfig;
@@ -3320,24 +3108,32 @@
    }
    /**
     * Perform rebuild index processing.
     *
     * @throws DatabaseException If an database error occurred.
     * @throws InterruptedException If an interrupted error occurred.
     * @throws ExecutionException If an Excecution error occurred.
     * @throws JebException If an JEB error occurred.
     * @throws InitializationException
     *           If an initialization error occurred.
     * @throws DatabaseException
     *           If an database error occurred.
     * @throws InterruptedException
     *           If an interrupted error occurred.
     * @throws ExecutionException
     *           If an Excecution error occurred.
     * @throws JebException
     *           If an JEB error occurred.
     */
    public void rebuldIndexes() throws DatabaseException, InterruptedException,
            ExecutionException, JebException
    public void rebuldIndexes() throws InitializationException,
        DatabaseException, InterruptedException, ExecutionException,
        JebException
    {
      phaseOne();
      if(isPhaseOneCanceled)
      if (isPhaseOneCanceled)
      {
        throw new InterruptedException("Rebuild Index canceled.");
      }
      phaseTwo();
      if(rebuildAll)
      if (rebuildAll)
      {
        setAllIndexesTrusted();
      }
@@ -3455,11 +3251,12 @@
    }
    private void phaseTwo() throws InterruptedException, JebException,
            ExecutionException
    private void phaseTwo() throws InitializationException,
        InterruptedException, JebException, ExecutionException
    {
      SecondPhaseProgressTask progressTask =
              new SecondPhaseProgressTask(entriesProcessed.get());
      SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask(
          entriesProcessed.get());
      Timer timer2 = new Timer();
      timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
      processIndexFiles();
@@ -4061,7 +3858,7 @@
   * This class reports progress of rebuild index processing at fixed
   * intervals.
   */
  class RebuildFirstPhaseProgressTask extends TimerTask
  private class RebuildFirstPhaseProgressTask extends TimerTask
  {
    /**
     * The number of records that had been processed at the time of the
@@ -4299,7 +4096,7 @@
   * This class reports progress of the second phase of import processing at
   * fixed intervals.
   */
  class SecondPhaseProgressTask extends TimerTask
  private class SecondPhaseProgressTask extends TimerTask
  {
    /**
     * The number of entries that had been read at the time of the