mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

dugan
17.41.2008 272f803e2dbe9da48152ea61814e43e387146f8b
These changes remove the temporary file limitation from import-ldif. Several other changes were made also:

- the cleaner is run at the end of the import

- the cleaner is run periodically during import if database eviction is detected

- the substring indexes are buffered to help boost performance during substring index processing

- the import files have been moved into its own package org.opends.server.backends.jeb.importLDIF

- the work threads do most of the processing

- import aborts if a work thread throws a runtime exception

- messages for the various stages of the import have been added (e.g. environment close)

The only functionality missing is VLV index processing. Also, there is a 2G limit on the
max entry ID value that can be used in a substring index.

The following configuration attributes have been removed:

- ds-cfg-import-temp-directory
- ds-cfg-import-buffer-size
- ds-cfg-import-pass-size

The should be removed from your config.ldif file.
6 files deleted
7 files added
1 files renamed
12 files modified
6786 ■■■■ changed files
opends/resource/config/config.ldif 3 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif 18 ●●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml 97 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/jeb.properties 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java 45 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/AttributeIndexBuilder.java 379 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/BackendImpl.java 42 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/EntryContainer.java 26 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/ID2Entry.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/ImportJob.java 1310 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/ImportThread.java 337 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/Index.java 90 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/IndexMergeThread.java 460 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/JebFormat.java 24 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/RootContainer.java 31 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/VLVIndexBuilder.java 311 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/VLVIndexMergeThread.java 473 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java 359 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java 236 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java 83 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java 1096 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java 354 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java 405 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java 104 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java 457 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/resource/config-changes.ldif 15 ●●●●● patch | view | raw | blame | history
opends/resource/config/config.ldif
@@ -181,10 +181,7 @@
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-subtree-delete-batch-size: 5000
ds-cfg-preload-time-limit: 0 seconds
ds-cfg-import-temp-directory: import-tmp
ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
opends/resource/schema/02-config.ldif
@@ -1085,16 +1085,6 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.220
  NAME 'ds-cfg-import-temp-directory'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.221
  NAME 'ds-cfg-import-buffer-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.222
  NAME 'ds-cfg-import-queue-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -1155,11 +1145,6 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.234
  NAME 'ds-cfg-import-pass-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.235
  NAME 'ds-cfg-replication-server-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -2241,13 +2226,10 @@
  MAY ( ds-cfg-index-entry-limit $
        ds-cfg-subtree-delete-size-limit $
        ds-cfg-preload-time-limit $
        ds-cfg-import-temp-directory $
        ds-cfg-import-buffer-size $
        ds-cfg-import-queue-size $
        ds-cfg-import-thread-count $
        ds-cfg-entries-compressed $
        ds-cfg-deadlock-retry-limit $
        ds-cfg-import-pass-size $
        ds-cfg-db-directory-permissions $
        ds-cfg-db-cache-percent $
        ds-cfg-subtree-delete-batch-size $
opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
@@ -246,70 +246,6 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="import-buffer-size" advanced="true">
    <adm:synopsis>
      Specifies the amount of memory that should be used as an internal
      buffer for index information when processing an LDIF import.
    </adm:synopsis>
    <adm:requires-admin-action>
      <adm:none>
        <adm:synopsis>
          Changes do not take effect for any import that may already
          be in progress.
        </adm:synopsis>
      </adm:none>
    </adm:requires-admin-action>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>256mb</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:size lower-limit="10mb" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-import-buffer-size</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="import-pass-size" advanced="true">
    <adm:synopsis>
      Specifies the maximum number of entries that should be imported in
      each import pass.
    </adm:synopsis>
    <adm:description>
      An import pass consists of the processing required to import a set
      of entries as well as the index post-processing required to index
      those entries. A value of zero for this property indicates that
      all entries should be processed in a single pass, which is the
      recommended configuration for most deployments, although a
      non-zero value may be required when importing a very large number
      of entries if the amount of memory required for index
      post-processing exceeds the total amount available to the server.
    </adm:description>
    <adm:requires-admin-action>
      <adm:none>
        <adm:synopsis>
          Changes do not take effect for any import that may already
          be in progress.
        </adm:synopsis>
      </adm:none>
    </adm:requires-admin-action>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>0</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="0" upper-limit="2147483647" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-import-pass-size</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="import-queue-size" advanced="true">
    <adm:synopsis>
      Specifies the size (in number of entries) of the queue that is 
@@ -337,39 +273,6 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="import-temp-directory" mandatory="true">
    <adm:synopsis>
      Specifies the location of the directory that is used to hold
      temporary information during the index post-processing phase of an LDIF import.
    </adm:synopsis>
    <adm:description>
      The specified directory is only used while an import is in
      progress and the files created in this directory are deleted
      as they are processed. It may be an absolute path or one that is
      relative to the instance root directory.
    </adm:description>
    <adm:requires-admin-action>
      <adm:none>
        <adm:synopsis>
          Changes do not take effect for any import that may already
          be in progress.
        </adm:synopsis>
      </adm:none>
    </adm:requires-admin-action>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>import-tmp</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:string />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-import-temp-directory</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="import-thread-count" advanced="true">
    <adm:synopsis>
      Specifies the number of threads that is used for concurrent
opends/src/messages/messages/jeb.properties
@@ -354,3 +354,32 @@
INFO_JEB_IMPORT_STARTING_173=%s starting import (build %s, R%d)
SEVERE_ERR_JEB_DIRECTORY_DOES_NOT_EXIST_174=The backend database directory \
 '%s' does not exist
SEVERE_ERR_JEB_IMPORT_LDIF_ABORT_175=The import was aborted because an \
  uncaught exception was thrown during processing
INFO_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE_176=Import LDIF environment close \
  took %d seconds
INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH_177=Begin substring buffer flush of %d \
  elements. Buffer total access: %d  buffer hits: %d
INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED_178=Substring buffer flush \
  completed in %d seconds
INFO_JEB_IMPORT_LDIF_FINAL_CLEAN_179=Begin final cleaner run
INFO_JEB_IMPORT_LDIF_CLEAN_180=Begin cleaner run
INFO_JEB_IMPORT_LDIF_CLEANER_RUN_DONE_181=Cleaner run took %d seconds %d logs \
  removed
INFO_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS_182=Cleaner will remove %d logs
INFO_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM_184=Available buffer memory %d bytes is \
  below the minimum value of %d bytes. Setting available buffer memory to \
  the minimum
INFO_JEB_IMPORT_LDIF_MEMORY_INFO_185=Using DB cache bytes: %d available \
  substring buffer bytes: %d
INFO_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM_186=Available buffer memory %d \
  bytes is below the minimum value of %d bytes allowed for a single import \
  context. Setting context available buffer memory to the minimum
INFO_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS_187=Checkpoints performed: %d
INFO_JEB_IMPORT_LDIF_CLEANER_STATS_188=Cleaner runs: %d files deleted: %d \
  entries read: %d IN nodes cleaned: %d
INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS_189=Eviction in progress. Passes: \
  %d nodes evicted: %d BIN nodes stripped: %d
INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED_190=Eviction detected after importing \
  %d entries
opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java
@@ -1631,4 +1631,49 @@
    builder.append(indexConfig.getAttribute().getNameOrOID());
    return builder.toString();
  }
  /**
   * Return the equality index.
   *
   * @return The equality index.
   */
  public Index getEqualityIndex() {
    return  equalityIndex;
  }
  /**
   * Return the approximate index.
   *
   * @return The approximate index.
   */
  public Index getApproximateIndex() {
    return approximateIndex;
  }
  /**
   * Return the ordering index.
   *
   * @return  The ordering index.
   */
  public Index getOrderingIndex() {
    return orderingIndex;
  }
  /**
   * Return the substring index.
   *
   * @return The substring index.
   */
  public Index getSubstringIndex() {
    return substringIndex;
  }
  /**
   * Return the presence index.
   *
   * @return The presence index.
   */
  public Index getPresenceIndex() {
    return presenceIndex;
  }
}
opends/src/server/org/opends/server/backends/jeb/AttributeIndexBuilder.java
File was deleted
opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
@@ -74,6 +74,7 @@
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.types.DN;
import org.opends.server.backends.jeb.importLDIF.Importer;
/**
 * This is an implementation of a Directory Server Backend which stores entries
@@ -1167,25 +1168,8 @@
    {
      EnvironmentConfig envConfig =
          ConfigurableEnvironment.parseConfigEntry(cfg);
      /**
       envConfig.setConfigParam("je.env.runCleaner", "false");
       envConfig.setConfigParam("je.log.numBuffers", "2");
       envConfig.setConfigParam("je.log.bufferSize", "15000000");
       envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
       envConfig.setConfigParam("je.log.fileMax", "100000000");
       **/
      if (importConfig.appendToExistingData())
      {
        envConfig.setReadOnly(false);
        envConfig.setAllowCreate(true);
        envConfig.setTransactional(true);
        envConfig.setTxnNoSync(true);
        envConfig.setConfigParam("je.env.isLocking", "true");
        envConfig.setConfigParam("je.env.runCheckpointer", "false");
      }
      else if(importConfig.clearBackend() || cfg.getBaseDN().size() <= 1)
      {
      if(!importConfig.appendToExistingData()) {
        if(importConfig.clearBackend() || cfg.getBaseDN().size() <= 1) {
        // We have the writer lock on the environment, now delete the
        // environment and re-open it. Only do this when we are
        // importing to all the base DNs in the backend or if the backend only
@@ -1193,23 +1177,21 @@
        File parentDirectory = getFileForPath(cfg.getDBDirectory());
        File backendDirectory = new File(parentDirectory, cfg.getBackendId());
        // If the backend does not exist the import will create it.
        if (backendDirectory.exists())
        {
          if (backendDirectory.exists()) {
          EnvManager.removeFiles(backendDirectory.getPath());
        }
        }
      }
        envConfig.setReadOnly(false);
        envConfig.setAllowCreate(true);
        envConfig.setTransactional(false);
        envConfig.setTxnNoSync(false);
        envConfig.setConfigParam("je.env.isLocking", "false");
        envConfig.setConfigParam("je.env.runCheckpointer", "false");
      }
      Importer importer = new Importer(importConfig);
      envConfig.setConfigParam("je.maxMemory", importer.getDBCacheSize());
      rootContainer = initializeRootContainer(envConfig);
      ImportJob importJob = new ImportJob(importConfig);
      return importJob.importLDIF(rootContainer);
      return importer.processImport(rootContainer);
    }
    catch (IOException ioe)
    {
@@ -1263,7 +1245,13 @@
      {
        if (rootContainer != null)
        {
          long startTime = System.currentTimeMillis();
          rootContainer.close();
          long finishTime = System.currentTimeMillis();
          long closeTime = (finishTime - startTime) / 1000;
          Message msg =
                       INFO_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE.get(closeTime);
          logError(msg);
          rootContainer = null;
        }
opends/src/server/org/opends/server/backends/jeb/EntryContainer.java
@@ -667,6 +667,16 @@
    return attrIndexMap.get(attrType);
  }
  /**
   * Return attribute index map.
   *
   * @return The attribute index map.
   */
  public Map<AttributeType, AttributeIndex> getAttributeIndexMap() {
    return attrIndexMap;
  }
  /**
   * Look for an VLV index for the given index name.
   *
@@ -4657,4 +4667,20 @@
    }
    return matchedDN;
  }
  /**
   * Get the exclusive lock.
   *
   */
  public void lock() {
    exclusiveLock.lock();
  }
  /**
   * Unlock the exclusive lock.
   */
  public void unlock() {
    exclusiveLock.unlock();
  }
}
opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
@@ -103,7 +103,7 @@
   * @throws  DirectoryException  If a problem occurs while attempting to encode
   *                              the entry.
   */
  private DatabaseEntry entryData(Entry entry)
  public DatabaseEntry entryData(Entry entry)
          throws DirectoryException
  {
    byte[] entryBytes;
opends/src/server/org/opends/server/backends/jeb/ImportJob.java
File was deleted
opends/src/server/org/opends/server/backends/jeb/ImportThread.java
File was deleted
opends/src/server/org/opends/server/backends/jeb/Index.java
@@ -34,6 +34,8 @@
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
import org.opends.server.backends.jeb.importLDIF.IntegerImportIDSet;
import org.opends.server.backends.jeb.importLDIF.ImportIDSet;
import static org.opends.messages.JebMessages.*;
import java.util.*;
@@ -112,6 +114,7 @@
   */
  private boolean rebuildRunning = false;
  /**
   * Create a new index object.
   * @param name The name of the index database within the entryContainer.
@@ -258,6 +261,65 @@
    return success;
  }
  /**
   * Add the specified import ID set to the provided key. Used during
   * substring buffer flushing.
   *
   * @param txn A transaction.
   * @param key The key to add the set to.
   * @param importIdSet The set of import IDs.
   * @throws DatabaseException If an database error occurs.
   */
  public void insert(Transaction txn, DatabaseEntry key,
                     ImportIDSet importIdSet)
  throws DatabaseException {
    OperationStatus status;
    DatabaseEntry data = new DatabaseEntry();
      status = read(txn, key, data, LockMode.RMW);
      if(status == OperationStatus.SUCCESS) {
        ImportIDSet newImportIDSet = new IntegerImportIDSet();
        if (newImportIDSet.merge(data.getData(), importIdSet, indexEntryLimit))
        {
          entryLimitExceededCount++;
        }
        data.setData(newImportIDSet.toDatabase());
      } else if(status == OperationStatus.NOTFOUND) {
        if(!importIdSet.isDefined()) {
          entryLimitExceededCount++;
        }
        data.setData(importIdSet.toDatabase());
      } else {
        //Should never happen during import.
        throw new DatabaseException();
      }
      put(txn,key, data);
  }
  /**
   * Add the specified entry ID to the provided keys in the keyset.
   *
   * @param txn  A transaction.
   * @param keySet  The set containing the keys.
   * @param entryID The entry ID.
   * @return <CODE>True</CODE> if the insert was successful.
   * @throws DatabaseException If a database error occurs.
   */
  public synchronized
  boolean insert(Transaction txn, Set<byte[]> keySet, EntryID entryID)
  throws DatabaseException {
    for(byte[] key : keySet) {
      if(insertIDWithRMW(txn, new DatabaseEntry(key), new DatabaseEntry(),
              entryID.getDatabaseEntry(), entryID) !=
              OperationStatus.SUCCESS)  {
        return false;
      }
    }
    return true;
  }
  private OperationStatus insertIDWithRMW(Transaction txn, DatabaseEntry key,
                                          DatabaseEntry data,
                                          DatabaseEntry entryIDData,
@@ -366,6 +428,25 @@
    }
  }
  /**
   * Delete specified entry ID from all keys in the provided key set.
   *
   * @param txn  A Transaction.
   * @param keySet A set of keys.
   * @param entryID The entry ID to delete.
   * @throws DatabaseException If a database error occurs.
   */
  public synchronized
  void delete(Transaction txn, Set<byte[]> keySet, EntryID entryID)
  throws DatabaseException {
    setTrusted(txn, false);
    for(byte[] key : keySet) {
       removeIDWithRMW(txn, new DatabaseEntry(key),
                       new DatabaseEntry(), entryID);
    }
    setTrusted(txn, true);
  }
  private void removeIDWithRMW(Transaction txn, DatabaseEntry key,
                               DatabaseEntry data, EntryID entryID)
      throws DatabaseException
@@ -833,6 +914,15 @@
  }
  /**
   * Return entry limit.
   *
   * @return The entry limit.
   */
  public int getIndexEntryLimit() {
    return this.indexEntryLimit;
  }
  /**
   * Set the index trust state.
   * @param txn A database transaction, or null if none is required.
   * @param trusted True if this index should be trusted or false
opends/src/server/org/opends/server/backends/jeb/IndexMergeThread.java
File was deleted
opends/src/server/org/opends/server/backends/jeb/JebFormat.java
@@ -365,6 +365,30 @@
  }
  /**
   * Decode a integer array using the specified byte array read from DB.
   *
   * @param bytes The byte array.
   * @return An integer array.
   */
  public static int[] intArrayFromDatabaseBytes(byte[] bytes) {
    byte[] decodedBytes = bytes;
    int count = decodedBytes.length / 8;
    int[] entryIDList = new int[count];
    for (int pos = 0, i = 0; i < count; i++) {
      int v = 0;
      pos +=4;
      v |= (decodedBytes[pos++] & 0xFFL) << 24;
      v |= (decodedBytes[pos++] & 0xFFL) << 16;
      v |= (decodedBytes[pos++] & 0xFFL) << 8;
      v |= (decodedBytes[pos++] & 0xFFL);
      entryIDList[i] = v;
    }
    return entryIDList;
  }
  /**
   * Encode an entry ID value to its database representation.
   * @param id The entry ID value to be encoded.
   * @return The encoded database value of the entry ID.
opends/src/server/org/opends/server/backends/jeb/RootContainer.java
@@ -26,17 +26,14 @@
 */
package org.opends.server.backends.jeb;
import org.opends.messages.Message;
import com.sleepycat.je.config.EnvironmentParams;
import com.sleepycat.je.config.ConfigParam;
import com.sleepycat.je.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.*;
import java.io.File;
import java.io.FilenameFilter;
import org.opends.server.monitors.DatabaseEnvironmentMonitor;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DN;
@@ -48,7 +45,6 @@
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.core.DirectoryServer;
import org.opends.server.config.ConfigException;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -79,6 +75,9 @@
   */
  private Environment env;
  //Used to force a checkpoint during import.
  private CheckpointConfig importForceCheckPoint = new CheckpointConfig();
  /**
   * The backend configuration.
   */
@@ -129,6 +128,7 @@
    this.compressedSchema = null;
    config.addLocalDBChangeListener(this);
    importForceCheckPoint.setForce(true);
  }
  /**
@@ -1029,4 +1029,27 @@
                                 messages);
    return ccr;
  }
  /**
   * Force a checkpoint.
   *
   * @throws DatabaseException If a database error occurs.
   */
  public void importForceCheckPoint() throws DatabaseException {
    env.checkpoint(importForceCheckPoint);
  }
  /**
   * Run the cleaner and return the number of files cleaned.
   *
   * @return The number of logs cleaned.
   * @throws DatabaseException If a database error occurs.
   */
  public int cleanedLogFiles() throws DatabaseException {
    int cleaned, totalCleaned = 0;
    while((cleaned = env.cleanLog()) > 0) {
      totalCleaned += cleaned;
    }
    return totalCleaned;
  }
}
opends/src/server/org/opends/server/backends/jeb/VLVIndexBuilder.java
File was deleted
opends/src/server/org/opends/server/backends/jeb/VLVIndexMergeThread.java
File was deleted
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
New file
@@ -0,0 +1,359 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.types.Entry;
import org.opends.server.backends.jeb.Index;
import org.opends.server.backends.jeb.EntryID;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.dbi.MemoryBudget;
import static org.opends.server.loggers.ErrorLogger.logError;
import org.opends.messages.Message;
import static org.opends.messages.JebMessages.*;
import java.util.*;
/**
 * Manages a shared cache among worker threads that caches substring
 * key/value pairs to avoid DB cache access. Once the cache is above it's
 * memory usage limit, it will start slowly flushing keys (similar to the
 * JEB eviction process) until it is under the limit.
 */
public class BufferManager {
  //Memory usage counter.
  private long memoryUsage=0;
  //Memory limit.
  private long memoryLimit;
  //Next element in the cache to start flushing at during next flushAll cycle.
  private KeyHashElement nextElem;
  //Extra bytes to flushAll.
  private final int extraBytes  = 1024 * 1024;
  //Counters for statistics, total is number of accesses, hit is number of
  //keys found in cache.
  private long total=0, hit=0;
  //Actual map used to buffer keys.
  private TreeMap<KeyHashElement, KeyHashElement> elementMap =
                        new TreeMap<KeyHashElement, KeyHashElement>();
  //Overhead values determined from using JHAT. They appear to be the same
  //for both 32 and 64 bit machines. Close enough.
  private final static int TREEMAP_ENTRY_OVERHEAD = 29;
  private final static int KEY_ELEMENT_OVERHEAD = 28;
  //Count of number of worker threads.
  private int importThreadCount;
  //Used to prevent memory flush
  private boolean limitFlush = true;
  /**
   * Create buffer manager instance.
   *
   * @param memoryLimit The memory limit.
   * @param importThreadCount  The count of import worker threads.
   */
  public BufferManager(long memoryLimit, int importThreadCount) {
    this.memoryLimit = memoryLimit;
    this.nextElem = null;
    this.importThreadCount = importThreadCount;
  }
  /**
   * Insert an entry ID into the buffer using the both the specified index and
   * entry to build a key set. Will flush the buffer if over the memory limit.
   *
   * @param index  The index to use.
   * @param entry The entry used to build the key set.
   * @param entryID The entry ID to insert into the key set.
   * @param txn A transaction.
   * @throws DatabaseException If a problem happened during a flushAll cycle.
   */
  void insert(Index index, Entry entry,
                     EntryID entryID, Transaction txn)
  throws DatabaseException {
     int entryLimit = index.getIndexEntryLimit();
     Set<byte[]> keySet = new HashSet<byte[]>();
     index.indexer.indexEntry(txn, entry, keySet);
    synchronized(elementMap) {
       for(byte[] key : keySet) {
         KeyHashElement elem = new KeyHashElement(key, index, entryID);
         total++;
         if(!elementMap.containsKey(elem)) {
            elementMap.put(elem, elem);
            memoryUsage += TREEMAP_ENTRY_OVERHEAD + elem.getMemorySize();
         } else {
           KeyHashElement curElem = elementMap.get(elem);
           if(curElem.isDefined()) {
            int oldSize = curElem.getMemorySize();
            curElem.addEntryID(entryID, entryLimit);
            int newSize = curElem.getMemorySize();
            //Might have moved from defined to undefined.
            memoryUsage += (newSize - oldSize);
            hit++;
           }
         }
       }
       //If over the memory limit and import hasn't completed
      //flush some keys from the cache to make room.
       if((memoryUsage > memoryLimit) && limitFlush) {
         flushUntilUnderLimit();
       }
    }
  }
  /**
   * Flush the buffer to DB until the buffer is under the memory limit.
   *
   * @throws DatabaseException If a problem happens during an index insert.
   */
  private void flushUntilUnderLimit() throws DatabaseException {
    Iterator<KeyHashElement> iter;
    if(nextElem == null) {
      iter = elementMap.keySet().iterator();
    } else {
      iter = elementMap.tailMap(nextElem).keySet().iterator();
    }
    while((memoryUsage + extraBytes > memoryLimit) && limitFlush) {
      if(iter.hasNext()) {
        KeyHashElement curElem = iter.next();
        //Never flush undefined elements.
        if(curElem.isDefined()) {
          Index index = curElem.getIndex();
          index.insert(null, new DatabaseEntry(curElem.getKey()),
                  curElem.getIDSet());
          memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
          if(limitFlush) {
            iter.remove();
          }
        }
      } else {
        //Wrapped around, start at the first element.
        nextElem = elementMap.firstKey();
        iter = elementMap.keySet().iterator();
      }
    }
    //Start at this element next flushAll cycle.
    nextElem = iter.next();
  }
  /**
   * Called from main thread to prepare for final buffer flush at end of
   * ldif load.
   */
  void prepareFlush() {
    limitFlush=false;
    Message msg =
           INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
    logError(msg);
  }
  /**
   * Writes all of the buffer elements to DB. The specific id is used to
   * share the buffer among the worker threads so this function can be
   * multi-threaded.
   *
   * @param id The thread id.
   * @throws DatabaseException If an error occurred during the insert.
   */
  void flushAll(int id) throws DatabaseException {
    TreeSet<KeyHashElement>  tSet =
            new TreeSet<KeyHashElement>(elementMap.keySet());
    Iterator<KeyHashElement> iter = tSet.iterator();
    int i=0;
    while(iter.hasNext()) {
      KeyHashElement curElem = iter.next();
      Index index = curElem.getIndex();
      //Each thread handles a piece of the buffer based on its thread id.
      if((i % importThreadCount) == id) {
        index.insert(null, new DatabaseEntry(curElem.getKey()),
                curElem.getIDSet());
      }
      i++;
    }
  }
  /**
   *  Class used to represent an element in the buffer.
   */
  class KeyHashElement implements Comparable {
    //Bytes representing the key.
    private  byte[] key;
    //Hash code returned from the System.identityHashCode method on the index
    //object.
    private int indexHashCode;
    //Index related to the element.
    private Index index;
    //The set of IDs related to the key.
    private ImportIDSet importIDSet;
    /**
     * Create instance of an element for the specified key and index, the add
     * the specified entry ID to the ID set.
     *
     * @param key The key.
     * @param index The index.
     * @param entryID The entry ID to start off with.
     */
    public KeyHashElement(byte[] key, Index index, EntryID entryID) {
      this.key = key;
      this.index = index;
      //Use the integer set for right now. This is good up to 2G number of
      //entries. There is also a LongImportSet, but it currently isn't used.
      this.importIDSet = new IntegerImportIDSet(entryID);
      //Used if there when there are conflicts if two or more indexes have
      //the same key.
      this.indexHashCode = System.identityHashCode(index);
    }
    /**
     * Add an entry ID to the set.
     *
     * @param entryID  The entry ID to add.
     * @param entryLimit The entry limit
     */
    void addEntryID(EntryID entryID, int entryLimit) {
      importIDSet.addEntryID(entryID, entryLimit);
    }
    /**
     * Return the index.
     *
     * @return The index.
     */
    Index getIndex(){
      return index;
    }
    /**
     * Return the key.
     *
     * @return The key.
     */
    byte[] getKey() {
      return key;
    }
    /**
     * Return the ID set.
      * @return The import ID set.
     */
    ImportIDSet getIDSet() {
      return importIDSet;
    }
    /**
     * Return if the ID set is defined or not.
     *
     * @return <CODE>True</CODE> if the ID set is defined.
     */
    boolean isDefined() {
      return importIDSet.isDefined();
    }
    /**
     * Compare the bytes of two keys.
     *
     * @param a  Key a.
     * @param b  Key b.
     * @return  0 if the keys are equal, -1 if key a is less than key b, 1 if
     *          key a is greater than key b.
     */
    private int compare(byte[] a, byte[] b) {
      int i;
      for (i = 0; i < a.length && i < b.length; i++) {
        if (a[i] > b[i]) {
          return 1;
        }
        else if (a[i] < b[i]) {
          return -1;
        }
      }
      if (a.length == b.length) {
        return 0;
      }
      if (a.length > b.length){
        return 1;
      }
      else {
        return -1;
      }
    }
    /**
     * Compare the specified object to the current object. If the keys are
     * equal, then the indexHashCode value is used as a tie-breaker.
     *
     * @param o The object representing a KeyHashElement.
     * @return 0 if the objects are equal, -1 if the current object is less
     *         than the specified object, 1 otherwise.
     */
    public int compareTo(Object o) {
      if (o == null) {
        throw new NullPointerException();
      }
      KeyHashElement inElem = (KeyHashElement) o;
      int keyCompare = compare(key, inElem.key);
      if(keyCompare == 0) {
        if(indexHashCode == inElem.indexHashCode) {
          return 0;
        } else if(indexHashCode < inElem.indexHashCode) {
          return -1;
        } else {
          return 1;
        }
      } else {
        return keyCompare;
      }
    }
    /**
     * Return the current total memory size of the element.
     * @return The memory size estimate of a KeyHashElement.
     */
    int getMemorySize() {
      return  KEY_ELEMENT_OVERHEAD +
              MemoryBudget.byteArraySize(key.length) +
              importIDSet.getMemorySize();
    }
  }
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java
File was renamed from opends/src/server/org/opends/server/backends/jeb/ImportContext.java
@@ -24,24 +24,27 @@
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb;
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.AttributeType;
import org.opends.server.util.LDIFReader;
import org.opends.server.admin.std.server.LocalDBBackendCfg;
import org.opends.server.backends.jeb.*;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
/**
 * This class represents the import context for a destination base DN.
 */
public class ImportContext
{
public class DNContext {
  /**
   * The destination base DN.
@@ -84,15 +87,35 @@
  private EntryContainer srcEntryContainer;
  /**
   * The amount of buffer memory available in bytes.
   */
  private long bufferSize;
  /**
   * A queue of entries that have been read from the LDIF and are ready
   * A queue of elements that have been read from the LDIF and are ready
   * to be imported.
   */
  private BlockingQueue<Entry> queue;
  private BlockingQueue<WorkElement> workQueue;
  //This currently isn't used.
  private ArrayList<VLVIndex> vlvIndexes = new ArrayList<VLVIndex>();
  /**
   * The maximum number of parent ID values that we will remember.
   */
  private static final int PARENT_ID_MAP_SIZE = 100;
  /**
   * Map of likely parent entry DNs to their entry IDs.
   */
  private HashMap<DN,EntryID> parentIDMap =
       new HashMap<DN,EntryID>(PARENT_ID_MAP_SIZE);
  //Map of pending DNs added to the work queue. Used to check if a parent
  //entry has been added, but isn't in the dn2id DB.
  private ConcurrentHashMap<DN,DN> pendingMap =
                                              new ConcurrentHashMap<DN, DN>() ;
  //Used to synchronize the parent ID map, since multiple worker threads
  //can be accessing it.
  private Object synchObject = new Object();
  /**
   * The number of LDAP entries added to the database, used to update the
@@ -114,22 +137,28 @@
   */
  private ArrayList<EntryID> IDs;
  /**
   * Get the import entry queue.
   * @return The import entry queue.
   */
  public BlockingQueue<Entry> getQueue()
  {
    return queue;
  }
  //The buffer manager used to hold the substring cache.
  private BufferManager bufferManager;
  /**
   * Set the import entry queue.
   * @param queue The import entry queue.
   * Get the work queue.
   *
   * @return  The work queue.
   */
  public void setQueue(BlockingQueue<Entry> queue)
  {
    this.queue = queue;
  public BlockingQueue<WorkElement> getWorkQueue() {
      return workQueue;
    }
  /**
   * Set the work queue to the specified work queue.
   *
   * @param workQueue The work queue.
   */
  public void
   setWorkQueue(BlockingQueue<WorkElement> workQueue) {
    this.workQueue = workQueue;
  }
  /**
@@ -242,24 +271,6 @@
  }
  /**
   * Get the available buffer size in bytes.
   * @return The available buffer size.
   */
  public long getBufferSize()
  {
    return bufferSize;
  }
  /**
   * Set the available buffer size in bytes.
   * @param bufferSize The available buffer size in bytes.
   */
  public void setBufferSize(long bufferSize)
  {
    this.bufferSize = bufferSize;
  }
  /**
   * Get the number of new LDAP entries imported into the entry database.
   * @return The number of new LDAP entries imported into the entry database.
   */
@@ -384,4 +395,141 @@
      }
    }
    /**
     * Return the attribute type attribute index map.
     *
     * @return The attribute type attribute index map.
     */
    public Map<AttributeType, AttributeIndex> getAttrIndexMap() {
      return entryContainer.getAttributeIndexMap();
    }
    /**
     * Set all the indexes to trusted.
     *
     * @throws DatabaseException If the trusted value cannot be updated in the
     * index DB.
     */
    public void setIndexesTrusted() throws DatabaseException {
      entryContainer.getID2Children().setTrusted(null,true);
      entryContainer.getID2Subtree().setTrusted(null, true);
      for(AttributeIndex attributeIndex :
          entryContainer.getAttributeIndexes()) {
        Index index;
        if((index = attributeIndex.getEqualityIndex()) != null) {
          index.setTrusted(null, true);
        }
        if((index=attributeIndex.getPresenceIndex()) != null) {
          index.setTrusted(null, true);
        }
        if((index=attributeIndex.getSubstringIndex()) != null) {
          index.setTrusted(null, true);
        }
        if((index=attributeIndex.getOrderingIndex()) != null) {
          index.setTrusted(null, true);
        }
        if((index=attributeIndex.getApproximateIndex()) != null) {
          index.setTrusted(null, true);
        }
      }
    }
    /**
     * Get the Entry ID of the parent entry.
     * @param parentDN  The parent DN.
     * @param dn2id The DN2ID DB.
     * @param txn A database transaction,
     * @return The entry ID of the parent entry.
     * @throws DatabaseException If a DB error occurs.
     */
    public
    EntryID getParentID(DN parentDN, DN2ID dn2id, Transaction txn)
            throws DatabaseException {
      EntryID parentID;
      synchronized(synchObject) {
        parentID = parentIDMap.get(parentDN);
        if (parentID != null) {
          return parentID;
        }
      }
      int i=0;
      //If the parent is in the pending map, another thread is working on the
      //parent entry; wait until that thread is done with the parent.
      while(isPending(parentDN)) {
        try {
          Thread.sleep(50);
          if(i == 3) {
            return null;
          }
          i++;
        } catch (Exception e) {
          return null;
        }
      }
      parentID = dn2id.get(txn, parentDN);
      //If the parent is in dn2id, add it to the cache.
      if (parentID != null) {
        synchronized(synchObject) {
          if (parentIDMap.size() >= PARENT_ID_MAP_SIZE) {
            Iterator<DN> iterator = parentIDMap.keySet().iterator();
            iterator.next();
            iterator.remove();
          }
          parentIDMap.put(parentDN, parentID);
        }
      }
      return parentID;
    }
    /**
     * Check if the parent DN is in the pending map.
     *
     * @param parentDN The DN of the parent.
     * @return <CODE>True</CODE> if the parent is in the pending map.
     */
    private boolean isPending(DN parentDN) {
      boolean ret = false;
      if(pendingMap.containsKey(parentDN)) {
        ret = true;
      }
      return ret;
    }
    /**
     * Add specified DN to the pending map.
     *
     * @param dn The DN to add to the map.
     */
    public void addPending(DN dn) {
      pendingMap.putIfAbsent(dn, dn);
    }
    /**
     * Remove the specified DN from the pending map.
     *
     * @param dn The DN to remove from the map.
     */
    public void removePending(DN dn) {
      pendingMap.remove(dn);
    }
    /**
     * Set the substring buffer manager to the specified buffer manager.
     *
     * @param bufferManager The buffer manager.
     */
    public void setBufferManager(BufferManager bufferManager) {
      this.bufferManager = bufferManager;
    }
    /**
     * Return the buffer manager.
     *
     * @return The buffer manager.
     */
    public BufferManager getBufferManager() {
      return bufferManager;
    }
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
New file
@@ -0,0 +1,83 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.backends.jeb.EntryID;
/**
 * Interface defining and import ID set.
 */
public interface ImportIDSet {
  /**
   * Add an entry ID to the set.
   *
   * @param entryID The entry ID to add.
   * @param entryLimit The entry limit.
   */
  public void addEntryID(EntryID entryID, int entryLimit);
  /**
   * Return if a  set is defined or not.
   *
   * @return <CODE>True</CODE> if a set is defined.
   */
  public boolean isDefined();
  /**
   * Return the memory size of a set.
   *
   * @return The sets current memory size.
   */
  public int getMemorySize();
  /**
   * Convert a set to a byte array suitable for saving to DB.
   *
   * @return A byte array representing the set.
   */
  public byte[] toDatabase();
  /**
   * Return the size of the set.
   *
   * @return The size of the ID set.
   */
  public int size();
  /**
   * Merge a byte array read from DB with a ID set.
   *
   * @param dbBytes The byte array read from DB.
   * @param bufImportIDSet The import ID set to merge.
   * @param entryLimit The entry limit.
   * @return <CODE>True</CODE> if the merged set is undefined.
   */
  public boolean merge(byte[] dbBytes, ImportIDSet bufImportIDSet,
                       int entryLimit);
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
New file
@@ -0,0 +1,1096 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.types.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import org.opends.server.admin.std.server.LocalDBBackendCfg;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.LDIFException;
import org.opends.server.util.RuntimeInformation;
import static org.opends.server.util.DynamicConstants.BUILD_ID;
import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.backends.jeb.*;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.messages.Message;
import org.opends.messages.JebMessages;
import static org.opends.messages.JebMessages.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.io.IOException;
import com.sleepycat.je.*;
/**
 * Performs a LDIF import.
 */
public class Importer implements Thread.UncaughtExceptionHandler {
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The JE backend configuration.
   */
  private LocalDBBackendCfg config;
  /**
   * The root container used for this import job.
   */
  private RootContainer rootContainer;
  /**
   * The LDIF import configuration.
   */
  private LDIFImportConfig ldifImportConfig;
  /**
   * The LDIF reader.
   */
  private LDIFReader reader;
  /**
   * Map of base DNs to their import context.
   */
  private LinkedHashMap<DN, DNContext> importMap =
      new LinkedHashMap<DN, DNContext>();
  /**
    * The number of entries migrated.
    */
   private int migratedCount;
  /**
   * The number of entries imported.
   */
  private int importedCount;
  /**
   * The number of milliseconds between job progress reports.
   */
  private long progressInterval = 10000;
  /**
   * The progress report timer.
   */
  private Timer timer;
  //Thread array.
  private CopyOnWriteArrayList<WorkThread> threads;
  //Progress task.
  private ProgressTask pTask;
  //Number of entries import before checking if cleaning is needed after
  //eviction has been detected.
  private static final int entryCleanInterval = 250000;
  //Minimum buffer amount to give to a buffer manager.
  private static final long minBuffer = 1024 * 1024;
  //Total available memory for the buffer managers.
  private long totalAvailBufferMemory = 0;
  //Memory size to be used for the DB cache in string format.
  private String dbCacheSizeStr;
  //Used to do an initial clean after eviction has been detected.
  private boolean firstClean=false;
  //A thread threw an Runtime exception stop the import.
  private boolean unCaughtExceptionThrown = false;
  /**
   * Create a new import job with the specified ldif import config.
   *
   * @param ldifImportConfig The LDIF import config.
   */
  public Importer(LDIFImportConfig ldifImportConfig)
  {
    this.ldifImportConfig = ldifImportConfig;
    this.threads = new CopyOnWriteArrayList<WorkThread>();
    calcMemoryLimits();
  }
  /**
   * Start the worker threads.
   *
   * @throws DatabaseException If a DB problem occurs.
   */
  private void startWorkerThreads()
          throws DatabaseException {
    int importThreadCount = config.getImportThreadCount();
    //Figure out how much buffer memory to give to each context.
    int contextCount = importMap.size();
    long memoryPerContext = totalAvailBufferMemory / contextCount;
    //Below min, use the min value.
    if(memoryPerContext < minBuffer) {
      Message msg =
            INFO_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
                                                             minBuffer);
      logError(msg);
      memoryPerContext = minBuffer;
    }
    // Create one set of worker threads/buffer managers for each base DN.
    for (DNContext context : importMap.values()) {
      BufferManager bufferManager = new BufferManager(memoryPerContext,
                                                      importThreadCount);
      context.setBufferManager(bufferManager);
      for (int i = 0; i < importThreadCount; i++) {
        WorkThread t = new WorkThread(context.getWorkQueue(), i,
                                      bufferManager, rootContainer);
        t.setUncaughtExceptionHandler(this);
        threads.add(t);
        t.start();
      }
    }
    // Start a timer for the progress report.
    timer = new Timer();
    TimerTask progressTask = new ProgressTask();
    //Used to get at extra functionality such as eviction detected.
    pTask = (ProgressTask) progressTask;
    timer.scheduleAtFixedRate(progressTask, progressInterval,
                              progressInterval);
  }
  /**
   * Import a ldif using the specified root container.
   *
   * @param rootContainer  The root container.
   * @return A LDIF result.
   * @throws DatabaseException  If a DB error occurs.
   * @throws IOException If a IO error occurs.
   * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
   * @throws DirectoryException If a directory error occurs.
   * @throws ConfigException If a configuration has an error.
   */
  public LDIFImportResult processImport(RootContainer rootContainer)
      throws DatabaseException, IOException, JebException, DirectoryException,
            ConfigException {
    // Create an LDIF reader. Throws an exception if the file does not exist.
    reader = new LDIFReader(ldifImportConfig);
    this.rootContainer = rootContainer;
    this.config = rootContainer.getConfiguration();
    Message message;
    long startTime;
    try {
      int importThreadCount = config.getImportThreadCount();
      message = INFO_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
                                                     BUILD_ID, REVISION_NUMBER);
      logError(message);
      message = INFO_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
      logError(message);
      RuntimeInformation.logInfo();
      for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
        DNContext DNContext =  getImportContext(entryContainer);
        if(DNContext != null) {
          importMap.put(entryContainer.getBaseDN(), DNContext);
        }
      }
      // Make a note of the time we started.
      startTime = System.currentTimeMillis();
      startWorkerThreads();
      try {
        importedCount = 0;
        migratedCount = 0;
        migrateExistingEntries();
        processLDIF();
        migrateExcludedEntries();
      } finally {
        if(!unCaughtExceptionThrown) {
          cleanUp();
          switchContainers();
        }
      }
    }
    finally {
      reader.close();
    }
    importProlog(startTime);
    return new LDIFImportResult(reader.getEntriesRead(),
                                reader.getEntriesRejected(),
                                reader.getEntriesIgnored());
  }
  /**
   * Switch containers if the migrated entries were written to the temporary
   * container.
   *
   * @throws DatabaseException If a DB problem occurs.
   * @throws JebException If a JEB problem occurs.
   */
  private void switchContainers() throws DatabaseException, JebException {
    for(DNContext importContext : importMap.values()) {
      DN baseDN = importContext.getBaseDN();
      EntryContainer srcEntryContainer =
              importContext.getSrcEntryContainer();
      if(srcEntryContainer != null) {
        if (debugEnabled()) {
          TRACER.debugInfo("Deleteing old entry container for base DN " +
                  "%s and renaming temp entry container", baseDN);
        }
        EntryContainer unregEC =
                rootContainer.unregisterEntryContainer(baseDN);
        //Make sure the unregistered EC for the base DN is the same as
        //the one in the import context.
        if(unregEC != srcEntryContainer) {
          if(debugEnabled()) {
            TRACER.debugInfo("Current entry container used for base DN " +
                    "%s is not the same as the source entry container used " +
                    "during the migration process.", baseDN);
          }
          rootContainer.registerEntryContainer(baseDN, unregEC);
          continue;
        }
        srcEntryContainer.lock();
        srcEntryContainer.delete();
        srcEntryContainer.unlock();
        EntryContainer newEC = importContext.getEntryContainer();
        newEC.lock();
        newEC.setDatabasePrefix(baseDN.toNormalizedString());
        newEC.unlock();
        rootContainer.registerEntryContainer(baseDN, newEC);
      }
    }
  }
  /**
   * Create and log messages at the end of the successful import.
   *
   * @param startTime The time the import started.
   */
  private void importProlog(long startTime) {
    Message message;
    long finishTime = System.currentTimeMillis();
    long importTime = (finishTime - startTime);
    float rate = 0;
    if (importTime > 0)
    {
      rate = 1000f*importedCount / importTime;
    }
    message = INFO_JEB_IMPORT_FINAL_STATUS.
        get(reader.getEntriesRead(), importedCount,
            reader.getEntriesIgnored(), reader.getEntriesRejected(),
            migratedCount, importTime/1000, rate);
    logError(message);
    message = INFO_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
        getEntryLimitExceededCount());
    logError(message);
  }
  /**
   * Run the cleaner if it is needed.
   *
   * @param entriesRead The number of entries read so far.
   * @param evictEntryNumber The number of entries to run the cleaner after
   * being read.
   * @throws DatabaseException If a DB problem occurs.
   */
  private void
  runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
          throws DatabaseException {
    if(!firstClean || (entriesRead %  evictEntryNumber) == 0) {
      //Make sure work queue is empty before starting.
      drainWorkQueue();
      Message msg = INFO_JEB_IMPORT_LDIF_CLEAN.get();
      runCleaner(msg);
      if(!firstClean) {
        firstClean=true;
      }
    }
  }
  /**
   * Run the cleaner, pausing the task thread output.
   *
   * @param header Message to be printed before cleaning.
   * @throws DatabaseException If a DB problem occurs.
   */
  private void runCleaner(Message header) throws DatabaseException {
    Message msg;
    long startTime = System.currentTimeMillis();
    //Need to force a checkpoint.
    rootContainer.importForceCheckPoint();
    logError(header);
    pTask.setPause(true);
    //Actually clean the files.
    int cleaned = rootContainer.cleanedLogFiles();
    //This checkpoint removes the files if any were cleaned.
    if(cleaned > 0) {
      msg = INFO_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
      logError(msg);
      rootContainer.importForceCheckPoint();
    }
    pTask.setPause(false);
    long finishTime = System.currentTimeMillis();
    long cleanTime = (finishTime - startTime) / 1000;
    msg = INFO_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
    logError(msg);
  }
  /**
   * Process a LDIF reader.
   *
   * @throws JebException If a JEB problem occurs.
   * @throws DatabaseException If a DB problem occurs.
   * @throws IOException If an IO exception occurs.
   */
  private void
  processLDIF() throws JebException, DatabaseException, IOException {
    Message message = INFO_JEB_IMPORT_LDIF_START.get();
    logError(message);
    do {
      if (ldifImportConfig.isCancelled()) {
        break;
      }
      if(threads.size() <= 0) {
        message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
        throw new JebException(message);
      }
      if(unCaughtExceptionThrown) {
        abortImport();
      }
      try {
        // Read the next entry.
        Entry entry = reader.readEntry();
        // Check for end of file.
        if (entry == null) {
          message = INFO_JEB_IMPORT_LDIF_END.get();
          logError(message);
          break;
        }
        // Route it according to base DN.
        DNContext DNContext = getImportConfig(entry.getDN());
        processEntry(DNContext, entry);
        //If the progress task has noticed eviction proceeding, start running
        //the cleaner.
        if(pTask.isEvicting()) {
          runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
        }
      }  catch (LDIFException e) {
        if (debugEnabled()) {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      } catch (DirectoryException e) {
        if (debugEnabled()) {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      } catch (DatabaseException e)  {
        if (debugEnabled()) {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    } while (true);
  }
  /**
   * Process an entry using the specified import context.
   *
   * @param DNContext The import context.
   * @param entry The entry to process.
   */
  private void processEntry(DNContext DNContext, Entry entry) {
    //Add this DN to the pending map.
    DNContext.addPending(entry.getDN());
    addEntryQueue(DNContext, entry);
  }
  /**
   * Add work item to specified import context's queue.
   * @param context The import context.
   * @param item The work item to add.
   * @return <CODE>True</CODE> if the the work  item was added to the queue.
   */
  private boolean
  addQueue(DNContext context, WorkElement item) {
    try {
      while(!context.getWorkQueue().offer(item, 1000,
                                            TimeUnit.MILLISECONDS)) {
        if(threads.size() <= 0) {
          // All worker threads died. We must stop now.
          return false;
        }
      }
    } catch (InterruptedException e) {
      if (debugEnabled()) {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    return true;
  }
  /**
   * Wait until the work queue is empty.
   */
  private void drainWorkQueue() {
    if(threads.size() > 0) {
      for (DNContext context : importMap.values()) {
        while (context.getWorkQueue().size() > 0) {
          try {
            Thread.sleep(100);
          } catch (Exception e) {
            // No action needed.
          }
        }
      }
    }
  }
  private void abortImport() throws JebException {
    //Stop work threads telling them to skip substring flush.
     stopWorkThreads(false, true);
     timer.cancel();
     Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
     throw new JebException(message);
  }
  /**
   * Stop work threads.
   *
   * @param flushBuffer Flag telling threads that it should do substring flush.
   * @param abort <CODE>True</CODE> if stop work threads was called from an
   *              abort.
   * @throws JebException if a Jeb error occurs.
   */
  private void
  stopWorkThreads(boolean flushBuffer, boolean abort) throws JebException {
    for (WorkThread t : threads) {
      if(!flushBuffer) {
        t.setFlush(false);
      }
      t.stopProcessing();
    }
    // Wait for each thread to stop.
    for (WorkThread t : threads) {
      try {
        if(!abort && unCaughtExceptionThrown) {
          timer.cancel();
          Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
          throw new JebException(message);
        }
        t.join();
        importedCount += t.getImportedCount();
      } catch (InterruptedException ie) {
        // No action needed?
      }
    }
  }
  /**
   * Clean up after a successful import.
   *
   * @throws DatabaseException If a DB error occurs.
   * @throws JebException If a Jeb error occurs.
   */
  private void cleanUp() throws DatabaseException, JebException {
     Message msg;
    //Drain the work queue.
    drainWorkQueue();
    //Prepare the buffer managers to flush.
    for(DNContext context : importMap.values()) {
      context.getBufferManager().prepareFlush();
    }
    pTask.setPause(true);
    long startTime = System.currentTimeMillis();
    stopWorkThreads(true, false);
    long finishTime = System.currentTimeMillis();
    long flushTime = (finishTime - startTime) / 1000;
     msg = INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
    logError(msg);
    timer.cancel();
    for(DNContext context : importMap.values()) {
      context.setIndexesTrusted();
    }
    msg = INFO_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
    //Run the cleaner.
    runCleaner(msg);
  }
  /**
   * Uncaught exception handler.
   *
   * @param t The thread working when the exception was thrown.
   * @param e The exception.
   */
  public void uncaughtException(Thread t, Throwable e) {
     unCaughtExceptionThrown = true;
     threads.remove(t);
     Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
         t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
     logError(msg);
   }
  /**
   * Get the entry limit exceeded counts from the indexes.
   *
   * @return Count of the index with entry limit exceeded values.
   */
  private int getEntryLimitExceededCount() {
    int count = 0;
    for (DNContext ic : importMap.values())
    {
      count += ic.getEntryContainer().getEntryLimitExceededCount();
    }
    return count;
  }
  /**
   * Return an import context related to the specified DN.
   * @param dn The dn.
   * @return  An import context.
   * @throws DirectoryException If an directory error occurs.
   */
  private DNContext getImportConfig(DN dn) throws DirectoryException {
    DNContext DNContext = null;
    DN nodeDN = dn;
    while (DNContext == null && nodeDN != null) {
      DNContext = importMap.get(nodeDN);
      if (DNContext == null)
      {
        nodeDN = nodeDN.getParentDNInSuffix();
      }
    }
    if (nodeDN == null) {
      // The entry should not have been given to this backend.
      Message message =
              JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
      throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
    }
    return DNContext;
  }
  /**
   * Creates an import context for the specified entry container.
   *
   * @param entryContainer The entry container.
   * @return Import context to use during import.
   * @throws DatabaseException If a database error occurs.
   * @throws JebException If a JEB error occurs.
   * @throws ConfigException If a configuration contains error.
   */
   private DNContext getImportContext(EntryContainer entryContainer)
      throws DatabaseException, JebException, ConfigException {
    DN baseDN = entryContainer.getBaseDN();
    EntryContainer srcEntryContainer = null;
    List<DN> includeBranches = new ArrayList<DN>();
    List<DN> excludeBranches = new ArrayList<DN>();
    if(!ldifImportConfig.appendToExistingData() &&
        !ldifImportConfig.clearBackend())
    {
      for(DN dn : ldifImportConfig.getExcludeBranches())
      {
        if(baseDN.equals(dn))
        {
          // This entire base DN was explicitly excluded. Skip.
          return null;
        }
        if(baseDN.isAncestorOf(dn))
        {
          excludeBranches.add(dn);
        }
      }
      if(!ldifImportConfig.getIncludeBranches().isEmpty())
      {
        for(DN dn : ldifImportConfig.getIncludeBranches())
        {
          if(baseDN.isAncestorOf(dn))
          {
            includeBranches.add(dn);
          }
        }
        if(includeBranches.isEmpty())
        {
          // There are no branches in the explicitly defined include list under
          // this base DN. Skip this base DN alltogether.
          return null;
        }
        // Remove any overlapping include branches.
        Iterator<DN> includeBranchIterator = includeBranches.iterator();
        while(includeBranchIterator.hasNext())
        {
          DN includeDN = includeBranchIterator.next();
          boolean keep = true;
          for(DN dn : includeBranches)
          {
            if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
            {
              keep = false;
              break;
            }
          }
          if(!keep)
          {
            includeBranchIterator.remove();
          }
        }
        // Remvoe any exclude branches that are not are not under a include
        // branch since they will be migrated as part of the existing entries
        // outside of the include branches anyways.
        Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
        while(excludeBranchIterator.hasNext())
        {
          DN excludeDN = excludeBranchIterator.next();
          boolean keep = false;
          for(DN includeDN : includeBranches)
          {
            if(includeDN.isAncestorOf(excludeDN))
            {
              keep = true;
              break;
            }
          }
          if(!keep)
          {
            excludeBranchIterator.remove();
          }
        }
        if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
            includeBranches.get(0).equals(baseDN))
        {
          // This entire base DN is explicitly included in the import with
          // no exclude branches that we need to migrate. Just clear the entry
          // container.
          entryContainer.lock();
          entryContainer.clear();
          entryContainer.unlock();
        }
        else
        {
          // Create a temp entry container
          srcEntryContainer = entryContainer;
          entryContainer =
              rootContainer.openEntryContainer(baseDN,
                                               baseDN.toNormalizedString() +
                                                   "_importTmp");
        }
      }
    }
    // Create an import context.
    DNContext DNContext = new DNContext();
    DNContext.setConfig(config);
    DNContext.setLDIFImportConfig(this.ldifImportConfig);
    DNContext.setLDIFReader(reader);
    DNContext.setBaseDN(baseDN);
    DNContext.setEntryContainer(entryContainer);
    DNContext.setSrcEntryContainer(srcEntryContainer);
    //Create queue.
    LinkedBlockingQueue<WorkElement> works =
        new LinkedBlockingQueue<WorkElement>
                     (config.getImportQueueSize());
    DNContext.setWorkQueue(works);
    // Set the include and exclude branches
    DNContext.setIncludeBranches(includeBranches);
    DNContext.setExcludeBranches(excludeBranches);
    return DNContext;
  }
  /**
   * Add specified context and entry to the work queue.
   *
   * @param context The context related to the entry DN.
   * @param entry The entry to work on.
   * @return  <CODE>True</CODE> if the element was added to the work queue.
   */
  private boolean
  addEntryQueue(DNContext context,  Entry entry) {
    WorkElement element =
            WorkElement.decode(entry, context);
    return addQueue(context, element);
  }
  /**
   * Calculate the memory usage for the substring buffer and the DB cache.
   */
  private void calcMemoryLimits() {
    Message msg;
    Runtime runtime = Runtime.getRuntime();
    long freeMemory = runtime.freeMemory();
    long maxMemory = runtime.maxMemory();
    long totMemory = runtime.totalMemory();
    long totFreeMemory = (freeMemory + (maxMemory - totMemory));
    long dbCacheLimit = (totFreeMemory * 40) / 100;
    dbCacheSizeStr = Long.toString(dbCacheLimit);
    totalAvailBufferMemory = (totFreeMemory * 10) / 100;
    if(totalAvailBufferMemory < (10 * minBuffer)) {
       msg =
          INFO_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
                                                      (10 * minBuffer));
      logError(msg);
      totalAvailBufferMemory = (10 * minBuffer);
    }
    msg=INFO_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
                                             totalAvailBufferMemory);
    logError(msg);
  }
  /**
   * Return the string representation of the DB cache size.
   *
   * @return DB cache size string.
   */
  public String getDBCacheSize() {
    return dbCacheSizeStr;
  }
  /**
   * Migrate any existing entries.
   *
   * @throws JebException If a JEB error occurs.
   * @throws DatabaseException  If a DB error occurs.
   * @throws DirectoryException If a directory error occurs.
   */
  private void migrateExistingEntries()
      throws JebException, DatabaseException, DirectoryException {
    for(DNContext context : importMap.values()) {
      EntryContainer srcEntryContainer = context.getSrcEntryContainer();
      if(srcEntryContainer != null &&
          !context.getIncludeBranches().isEmpty()) {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode lockMode = LockMode.DEFAULT;
        OperationStatus status;
        Message message = INFO_JEB_IMPORT_MIGRATION_START.get(
            "existing", String.valueOf(context.getBaseDN()));
        logError(message);
        Cursor cursor =
            srcEntryContainer.getDN2ID().openCursor(null,
                                                   CursorConfig.READ_COMMITTED);
        try {
          status = cursor.getFirst(key, data, lockMode);
          while(status == OperationStatus.SUCCESS &&
                !ldifImportConfig.isCancelled()) {
            if(threads.size() <= 0) {
              message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
              throw new JebException(message);
            }
            DN dn = DN.decode(new ASN1OctetString(key.getData()));
            if(!context.getIncludeBranches().contains(dn)) {
              EntryID id = new EntryID(data);
              Entry entry = srcEntryContainer.getID2Entry().get(null, id);
              processEntry(context, entry);
              migratedCount++;
              status = cursor.getNext(key, data, lockMode);
            }  else {
              // This is the base entry for a branch that will be included
              // in the import so we don't want to copy the branch to the new
              // entry container.
              /**
               * Advance the cursor to next entry at the same level in the DIT
               * skipping all the entries in this branch.
               * Set the next starting value to a value of equal length but
               * slightly greater than the previous DN. Since keys are compared
               * in reverse order we must set the first byte (the comma).
               * No possibility of overflow here.
               */
              byte[] begin =
                  StaticUtils.getBytes("," + dn.toNormalizedString());
              begin[0] = (byte) (begin[0] + 1);
              key.setData(begin);
              status = cursor.getSearchKeyRange(key, data, lockMode);
            }
          }
        } finally {
          cursor.close();
        }
      }
    }
  }
  /**
   * Migrate excluded entries.
   *
   * @throws JebException If a JEB error occurs.
   * @throws DatabaseException  If a DB error occurs.
   * @throws DirectoryException If a directory error occurs.
   */
  private void migrateExcludedEntries()
      throws JebException, DatabaseException, DirectoryException {
    for(DNContext importContext : importMap.values()) {
      EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
      if(srcEntryContainer != null &&
          !importContext.getExcludeBranches().isEmpty()) {
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode lockMode = LockMode.DEFAULT;
        OperationStatus status;
        Message message = INFO_JEB_IMPORT_MIGRATION_START.get(
            "excluded", String.valueOf(importContext.getBaseDN()));
        logError(message);
        Cursor cursor =
            srcEntryContainer.getDN2ID().openCursor(null,
                                                   CursorConfig.READ_COMMITTED);
        Comparator<byte[]> dn2idComparator =
            srcEntryContainer.getDN2ID().getComparator();
        try {
          for(DN excludedDN : importContext.getExcludeBranches()) {
            byte[] suffix =
                StaticUtils.getBytes(excludedDN.toNormalizedString());
            key.setData(suffix);
            status = cursor.getSearchKeyRange(key, data, lockMode);
            if(status == OperationStatus.SUCCESS &&
                Arrays.equals(key.getData(), suffix)) {
              // This is the base entry for a branch that was excluded in the
              // import so we must migrate all entries in this branch over to
              // the new entry container.
              byte[] end =
                  StaticUtils.getBytes("," + excludedDN.toNormalizedString());
              end[0] = (byte) (end[0] + 1);
              while(status == OperationStatus.SUCCESS &&
                  dn2idComparator.compare(key.getData(), end) < 0 &&
                  !ldifImportConfig.isCancelled()) {
                if(threads.size() <= 0) {
                  message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
                  throw new JebException(message);
                }
                EntryID id = new EntryID(data);
                Entry entry = srcEntryContainer.getID2Entry().get(null, id);
                processEntry(importContext, entry);
                migratedCount++;
                status = cursor.getNext(key, data, lockMode);
              }
            }
          }
        }
        finally
        {
          cursor.close();
        }
      }
    }
  }
  /**
   * This class reports progress of the import job at fixed intervals.
   */
  private final class ProgressTask extends TimerTask
  {
    /**
     * The number of entries that had been read at the time of the
     * previous progress report.
     */
    private long previousCount = 0;
    /**
     * The time in milliseconds of the previous progress report.
     */
    private long previousTime;
    /**
     * The environment statistics at the time of the previous report.
     */
    private EnvironmentStats prevEnvStats;
    /**
     * The number of bytes in a megabyte.
     * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
     */
    public static final int bytesPerMegabyte = 1024*1024;
    //Determines if the ldif is being read.
    private boolean ldifRead = false;
    //Determines if eviction has been detected.
    private boolean evicting = false;
    //Entry count when eviction was detected.
    private long evictionEntryCount = 0;
    //Suspend output.
    private boolean pause = false;
    /**
     * Create a new import progress task.
     * @throws DatabaseException If an error occurs in the JE database.
     */
    public ProgressTask() throws DatabaseException
    {
      previousTime = System.currentTimeMillis();
      prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
    }
    /**
     * Return if reading the LDIF file.
     */
    public void ldifRead() {
      ldifRead=true;
    }
   /**
    * Return value of evicting flag.
    *
    * @return <CODE>True</CODE> if eviction is detected.
    */
    public  boolean isEvicting() {
     return evicting;
    }
    /**
     * Return count of entries when eviction was detected.
     *
     * @return  The entry count when eviction was detected.
     */
    public long getEvictionEntryCount() {
      return evictionEntryCount;
    }
    /**
     * Suspend output if true.
     *
     * @param v The value to set the suspend value to.
     */
    public void setPause(boolean v) {
    pause=v;
   }
    /**
     * The action to be performed by this timer task.
     */
    public void run() {
      long latestCount = reader.getEntriesRead() + 0;
      long deltaCount = (latestCount - previousCount);
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
      Message message;
      if (deltaTime == 0) {
        return;
      }
      if(pause) {
        return;
      }
      if(!ldifRead) {
        long numRead     = reader.getEntriesRead();
        long numIgnored  = reader.getEntriesIgnored();
        long numRejected = reader.getEntriesRejected();
         float rate = 1000f*deltaCount / deltaTime;
         message = INFO_JEB_IMPORT_PROGRESS_REPORT.get(
            numRead, numIgnored, numRejected, 0, rate);
        logError(message);
      }
      try
      {
        Runtime runtime = Runtime.getRuntime();
        long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
        EnvironmentStats envStats =
            rootContainer.getEnvironmentStats(new StatsConfig());
        long nCacheMiss =
            envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
        float cacheMissRate = 0;
        if (deltaCount > 0) {
          cacheMissRate = nCacheMiss/(float)deltaCount;
        }
        message = INFO_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
            freeMemory, cacheMissRate);
        logError(message);
        long evictPasses = envStats.getNEvictPasses();
        long evictNodes = envStats.getNNodesExplicitlyEvicted();
        long evictBinsStrip = envStats.getNBINsStripped();
        int cleanerRuns = envStats.getNCleanerRuns();
        int cleanerDeletions = envStats.getNCleanerDeletions();
        int cleanerEntriesRead = envStats.getNCleanerEntriesRead();
        int cleanerINCleaned = envStats.getNINsCleaned();
        int checkPoints = envStats.getNCheckpoints();
        if(evictPasses != 0) {
          if(!evicting) {
            evicting=true;
            if(!ldifRead) {
              evictionEntryCount=reader.getEntriesRead();
              message =
                 INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
              logError(message);
            }
          }
          message =
                  INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
                          evictNodes, evictBinsStrip);
          logError(message);
        }
        if(cleanerRuns != 0) {
          message = INFO_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
                  cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
          logError(message);
        }
        if(checkPoints  > 1) {
          message = INFO_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
          logError(message);
        }
        prevEnvStats = envStats;
      } catch (DatabaseException e) {
        // Unlikely to happen and not critical.
      }
      previousCount = latestCount;
      previousTime = latestTime;
    }
  }
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java
New file
@@ -0,0 +1,354 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.backends.jeb.EntryID;
import org.opends.server.backends.jeb.JebFormat;
import com.sleepycat.je.dbi.MemoryBudget;
/**
 * An import ID set backed by an array of ints.
 */
public class IntegerImportIDSet implements ImportIDSet {
  //Gleamed from JHAT. The same for 32/64 bit.
  private final static int THIS_OVERHEAD = 17;
  /**
   * The internal array where elements are stored.
   */
  private int[] array = null;
  /**
   * The number of valid elements in the array.
   */
  private int count = 0;
  //Boolean to keep track if the instance is defined or not.
  private boolean isDefined=true;
  /**
   * Create an empty import set.
   */
  public IntegerImportIDSet() {
  }
  /**
   * Create an import set and add the specified entry ID to it.
   *
   * @param id The entry ID.
   */
  public IntegerImportIDSet(EntryID id) {
    this.array = new int[1];
    this.array[0] = (int) id.longValue();
    count=1;
  }
  /**
   * {@inheritDoc}
   */
  public boolean isDefined() {
    return isDefined;
  }
  /**
   * {@inheritDoc}
   */
  public int getMemorySize() {
    if(array != null) {
      return THIS_OVERHEAD + MemoryBudget.byteArraySize(array.length * 4);
    } else {
      return THIS_OVERHEAD;
    }
  }
  /**
   * {@inheritDoc}
   */
  public boolean merge(byte[] dBbytes, ImportIDSet importIdSet, int limit) {
    boolean incrLimitCount=false;
    boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80);
    if(dbUndefined) {
      isDefined=false;
    } else if(!importIdSet.isDefined()) {
      isDefined=false;
      incrLimitCount=true;
    } else {
      array = JebFormat.intArrayFromDatabaseBytes(dBbytes);
      if(array.length + importIdSet.size() > limit) {
        isDefined=false;
        incrLimitCount=true;
        count = 0;
      } else {
        count = array.length;
        addAll((IntegerImportIDSet) importIdSet);
      }
    }
    return incrLimitCount;
  }
  /**
   * Add all of the specified import ID set to the import set.
   *
   * @param that The import ID set to add.
   */
  private  void addAll(IntegerImportIDSet that) {
    resize(this.count+that.count);
    if (that.count == 0)
    {
      return;
    }
    // Optimize for the case where the two sets are sure to have no overlap.
    if (this.count == 0 || that.array[0] > this.array[this.count-1])
    {
      System.arraycopy(that.array, 0, this.array, this.count, that.count);
      count += that.count;
      return;
    }
    if (this.array[0] > that.array[that.count-1])
    {
      System.arraycopy(this.array, 0, this.array, that.count, this.count);
      System.arraycopy(that.array, 0, this.array, 0, that.count);
      count += that.count;
      return;
    }
    int destPos = binarySearch(this.array, this.count, that.array[0]);
    if (destPos < 0)
    {
      destPos = -(destPos+1);
    }
    // Make space for the copy.
    int aCount = this.count - destPos;
    int aPos = destPos + that.count;
    int aEnd = aPos + aCount;
    System.arraycopy(this.array, destPos, this.array, aPos, aCount);
    // Optimize for the case where there is no overlap.
    if (this.array[aPos] > that.array[that.count-1])
    {
      System.arraycopy(that.array, 0, this.array, destPos, that.count);
      count += that.count;
      return;
    }
    int bPos;
    for ( bPos = 0; aPos < aEnd && bPos < that.count; )
    {
      if ( this.array[aPos] < that.array[bPos] )
      {
        this.array[destPos++] = this.array[aPos++];
      }
      else if ( this.array[aPos] > that.array[bPos] )
      {
        this.array[destPos++] = that.array[bPos++];
      }
      else
      {
        this.array[destPos++] = this.array[aPos++];
        bPos++;
      }
    }
    // Copy any remainder.
    int aRemain = aEnd - aPos;
    if (aRemain > 0)
    {
      System.arraycopy(this.array, aPos, this.array, destPos, aRemain);
      destPos += aRemain;
    }
    int bRemain = that.count - bPos;
    if (bRemain > 0)
    {
      System.arraycopy(that.array, bPos, this.array, destPos, bRemain);
      destPos += bRemain;
    }
    count = destPos;
  }
  /**
   * {@inheritDoc}
   */
  public int size() {
    return count;
  }
  /**
   * {@inheritDoc}
   */
  public void addEntryID(EntryID entryID, int limit) {
    if(!isDefined()) {
      return;
    }
    if(isDefined() && ((count + 1) > limit)) {
      isDefined = false;
      array = null;
      count = 0;
    } else {
      add((int)entryID.longValue());
    }
  }
  /**
   * Add the specified integer to the import set.
   *
   * @param v The integer value to add.
   * @return <CODE>True</CODE> if the value was added.
   */
  private boolean add(int v) {
    resize(count+1);
    if (count == 0 || v > array[count-1])
    {
      array[count++] = v;
      return true;
    }
    int pos = binarySearch(array, count, v);
    if (pos >=0)
    {
      return false;
    }
    // For a negative return value r, the index -(r+1) gives the array
    // index at which the specified value can be inserted to maintain
    // the sorted order of the array.
    pos = -(pos+1);
    System.arraycopy(array, pos, array, pos+1, count-pos);
    array[pos] = v;
    count++;
    return true;
  }
  /**
   * Perform binary search for the specified key in the specified array.
   *
   * @param a The array to search in.
   * @param count The max value in the array.
   * @param key The key value.
   * @return Position in array key is found or a negative if it wasn't found.
   */
  private static int binarySearch(int[] a, int count, int key) {
    int low = 0;
    int high = count-1;
    while (low <= high)
    {
      int mid = (low + high) >> 1;
      int midVal = a[mid];
      if (midVal < key)
        low = mid + 1;
      else if (midVal > key)
        high = mid - 1;
      else
        return mid; // key found
    }
    return -(low + 1);  // key not found.
  }
  /**
   * Resize the array to the specified size if needed.
   *
   * @param size The required size.
   */
  private void resize(int size) {
    if (array == null)
    {
      array = new int[size];
    }
    else if (array.length < size)
    {
      // Expand the size of the array in powers of two.
      int newSize = array.length == 0 ? 1 : array.length;
      do
      {
        newSize *= 2;
      } while (newSize < size);
      int[] newBytes = new int[newSize];
      System.arraycopy(array, 0, newBytes, 0, count);
      array = newBytes;
    }
  }
  /**
   * {@inheritDoc}
   */
  public byte[] toDatabase() {
    if(isDefined) {
       return encode(null);
     } else {
       return JebFormat.entryIDUndefinedSizeToDatabase(Long.MAX_VALUE);
     }
   }
  /**
   * Encode the integer array to a byte array suitable for writing to DB.
   *
   * @param bytes The byte array to use in the encoding.
   * @return A byte array suitable to write to DB.
   */
  private byte[] encode(byte[] bytes) {
    int encodedSize = count * 8;
    if (bytes == null || bytes.length < encodedSize) {
      bytes = new byte[encodedSize];
    }
    for (int pos = 0, i = 0; i < count; i++) {
      long v = (long)array[i] & 0x00ffffffffL;
      bytes[pos++] = (byte) ((v >>> 56) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 48) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 40) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 32) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 24) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 16) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 8) & 0xFF);
      bytes[pos++] = (byte) (v & 0xFF);
    }
    return bytes;
  }
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java
New file
@@ -0,0 +1,405 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import com.sleepycat.je.dbi.MemoryBudget;
import org.opends.server.util.RuntimeInformation;
import org.opends.server.backends.jeb.EntryID;
import org.opends.server.backends.jeb.JebFormat;
/**
 * A import ID set backed by an array of longs.
 */
public class LongImportIDSet implements ImportIDSet {
  //Overhead values gleamed from JHAT.
  private final static int LONGS_OVERHEAD;
  private final static int LONGS_OVERHEAD_32 = 25;
  private final static int LONGS_OVERHEAD_64 = 25;
  /**
   * The internal array where elements are stored.
   */
  private long[] array = null;
  /**
   * The number of valid elements in the array.
   */
  private int count = 0;
  //Boolean to keep track if the instance is defined or not.
  boolean isDefined=true;
  static {
    if(RuntimeInformation.is64Bit()) {
      LONGS_OVERHEAD = LONGS_OVERHEAD_64;
    } else {
      LONGS_OVERHEAD = LONGS_OVERHEAD_32;
    }
  }
  /**
   * Create an empty instance.
   */
  public LongImportIDSet() {
  }
  /**
   * Create instance and add specified entry ID to the set.
   *
   * @param id The entry ID.
   */
  public LongImportIDSet(EntryID id) {
     this.array = new long[1];
     this.array[0] = id.longValue();
     count=1;
   }
  /**
   * {@inheritDoc}
   */
   public boolean isDefined() {
    return isDefined;
  }
  /**
   * {@inheritDoc}
   */
  public int getMemorySize() {
    if(array != null) {
      return LONGS_OVERHEAD + MemoryBudget.byteArraySize(array.length * 8);
    } else {
      return LONGS_OVERHEAD;
    }
  }
  /**
   * {@inheritDoc}
   */
  public boolean merge(byte[] DBbytes, ImportIDSet importIdSet, int limit) {
    boolean incrLimitCount=false;
    boolean dbUndefined = ((DBbytes[0] & 0x80) == 0x80);
    if(dbUndefined) {
      isDefined=false;
    } else if(!importIdSet.isDefined()) {
      isDefined=false;
      incrLimitCount=true;
    } else {
      array = JebFormat.entryIDListFromDatabase(DBbytes);
      if(array.length + importIdSet.size() > limit) {
          isDefined=false;
          incrLimitCount=true;
      } else {
        count = array.length;
        addAll((LongImportIDSet) importIdSet);
      }
    }
    return incrLimitCount;
  }
  /**
   * {@inheritDoc}
   */
  public void addEntryID(EntryID entryID, int limit) {
    if(!isDefined()) {
       return;
    }
    if(isDefined() && ((count + 1) > limit)) {
      isDefined = false;
      array = null;
      count = 0;
    } else {
       add(entryID.longValue());
    }
  }
  /**
   * {@inheritDoc}
   */
  public byte[] toDatabase() {
    if (isDefined()) return encode(null);
    else return JebFormat.entryIDUndefinedSizeToDatabase(Long.MAX_VALUE);
  }
  /**
   * Decodes a set from a byte array.
   * @param bytes The encoded value.
   */
  void decode(byte[] bytes)
  {
    if (bytes == null)
    {
      count = 0;
      return;
    }
    int count = bytes.length / 8;
    resize(count);
    for (int pos = 0, i = 0; i < count; i++)
    {
      long v = 0;
      v |= (bytes[pos++] & 0xFFL) << 56;
      v |= (bytes[pos++] & 0xFFL) << 48;
      v |= (bytes[pos++] & 0xFFL) << 40;
      v |= (bytes[pos++] & 0xFFL) << 32;
      v |= (bytes[pos++] & 0xFFL) << 24;
      v |= (bytes[pos++] & 0xFFL) << 16;
      v |= (bytes[pos++] & 0xFFL) << 8;
      v |= (bytes[pos++] & 0xFFL);
      array[i] = v;
    }
    this.count = count;
  }
  /**
   * Encode this value into a byte array.
   * @param bytes The array into which the value will be encoded.  If the
   * provided array is null, or is not big enough, a new array will be
   * allocated.
   * @return The encoded array. If the provided array was bigger than needed
   * to encode the value then the provided array is returned and the number
   * of bytes of useful data is given by the encodedSize method.
   */
  byte[] encode(byte[] bytes) {
    int encodedSize = count * 8;
    if (bytes == null || bytes.length < encodedSize)
    {
      bytes = new byte[encodedSize];
    }
    for (int pos = 0, i = 0; i < count; i++)
    {
      long v = array[i];
      bytes[pos++] = (byte) ((v >>> 56) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 48) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 40) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 32) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 24) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 16) & 0xFF);
      bytes[pos++] = (byte) ((v >>> 8) & 0xFF);
      bytes[pos++] = (byte) (v & 0xFF);
    }
    return bytes;
  }
  /**
   * This is very much like Arrays.binarySearch except that it searches only
   * an initial portion of the provided array.
   * @param a The array to be searched.
   * @param count The number of initial elements in the array to be searched.
   * @param key The element to search for.
   * @return See Arrays.binarySearch.
   */
  private static int binarySearch(long[] a, int count, long key) {
    int low = 0;
    int high = count-1;
    while (low <= high)
    {
      int mid = (low + high) >> 1;
      long midVal = a[mid];
      if (midVal < key)
        low = mid + 1;
      else if (midVal > key)
        high = mid - 1;
      else
        return mid; // key found
    }
    return -(low + 1);  // key not found.
  }
  /**
   * Add a new value to the set.
   * @param v The value to be added.
   * @return true if the value was added, false if it was already present
   * in the set.
   */
  private boolean add(long v) {
    resize(count+1);
    if (count == 0 || v > array[count-1])
    {
      array[count++] = v;
      return true;
    }
    int pos = binarySearch(array, count, v);
    if (pos >=0)
    {
      return false;
    }
    // For a negative return value r, the index -(r+1) gives the array
    // index at which the specified value can be inserted to maintain
    // the sorted order of the array.
    pos = -(pos+1);
    System.arraycopy(array, pos, array, pos+1, count-pos);
    array[pos] = v;
    count++;
    return true;
  }
  /**
   * Adds all the elements of a provided set to this set if they are not
   * already present.
   * @param that The set of elements to be added.
   */
  private void addAll(LongImportIDSet that) {
    resize(this.count+that.count);
    if (that.count == 0)
    {
      return;
    }
    // Optimize for the case where the two sets are sure to have no overlap.
    if (this.count == 0 || that.array[0] > this.array[this.count-1])
    {
      System.arraycopy(that.array, 0, this.array, this.count, that.count);
      count += that.count;
      return;
    }
    if (this.array[0] > that.array[that.count-1])
    {
      System.arraycopy(this.array, 0, this.array, that.count, this.count);
      System.arraycopy(that.array, 0, this.array, 0, that.count);
      count += that.count;
      return;
    }
    int destPos = binarySearch(this.array, this.count, that.array[0]);
    if (destPos < 0)
    {
      destPos = -(destPos+1);
    }
    // Make space for the copy.
    int aCount = this.count - destPos;
    int aPos = destPos + that.count;
    int aEnd = aPos + aCount;
    System.arraycopy(this.array, destPos, this.array, aPos, aCount);
    // Optimize for the case where there is no overlap.
    if (this.array[aPos] > that.array[that.count-1])
    {
      System.arraycopy(that.array, 0, this.array, destPos, that.count);
      count += that.count;
      return;
    }
    int bPos;
    for ( bPos = 0; aPos < aEnd && bPos < that.count; )
    {
      if ( this.array[aPos] < that.array[bPos] )
      {
        this.array[destPos++] = this.array[aPos++];
      }
      else if ( this.array[aPos] > that.array[bPos] )
      {
        this.array[destPos++] = that.array[bPos++];
      }
      else
      {
        this.array[destPos++] = this.array[aPos++];
        bPos++;
      }
    }
    // Copy any remainder.
    int aRemain = aEnd - aPos;
    if (aRemain > 0)
    {
      System.arraycopy(this.array, aPos, this.array, destPos, aRemain);
      destPos += aRemain;
    }
    int bRemain = that.count - bPos;
    if (bRemain > 0)
    {
      System.arraycopy(that.array, bPos, this.array, destPos, bRemain);
      destPos += bRemain;
    }
    count = destPos;
  }
  /**
   * {@inheritDoc}
   */
  public int size() {
    return count;
  }
  /**
   * Ensures capacity of the internal array for a given number of elements.
   * @param size The internal array will be guaranteed to be at least this
   * size.
   */
  private void resize(int size) {
    if (array == null)
    {
      array = new long[size];
    }
    else if (array.length < size)
    {
      // Expand the size of the array in powers of two.
      int newSize = array.length == 0 ? 1 : array.length;
      do
      {
        newSize *= 2;
      } while (newSize < size);
      long[] newBytes = new long[newSize];
      System.arraycopy(array, 0, newBytes, 0, count);
      array = newBytes;
    }
  }
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java
New file
@@ -0,0 +1,104 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.types.Entry;
/**
 * A work element passed on the work queue.
 */
public class WorkElement {
  //The entry to import.
  private Entry entry;
  //Used in replace mode, this is the entry to replace.
  private Entry existingEntry;
  //The context related to the entry.
  private DNContext context;
  /**
   * Create a work element instance.
   *
   * @param entry The entry to import.
   * @param context The context related to the entry.
   */
  private WorkElement(Entry entry, DNContext context )  {
    this.entry = entry;
    this.context = context;
  }
  /**
   * Static to create an work element.
   *
   * @param entry The entry to import.
   * @param context The context related to the entry.
   * @return  A work element to put on the queue.
   */
  public static
  WorkElement decode(Entry entry, DNContext context ) {
    return new WorkElement(entry, context);
  }
  /**
   * Return the entry to import.
   *
   * @return  The entry to import.
   */
  public Entry getEntry() {
    return entry;
  }
  /**
   * Return the context related to the entry.
   *
   * @return The context.
   */
  public DNContext getContext() {
    return context;
  }
  /**
   * Return an existing entry, used during replace mode.
   *
   * @return An existing entry.
   */
  public Entry getExistingEntry() {
    return existingEntry;
  }
  /**
   * Set the existing entry.
   *
   * @param existingEntry The existing entry to set.
   */
  public void setExistingEntry(Entry existingEntry) {
    this.existingEntry = existingEntry;
  }
}
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
New file
@@ -0,0 +1,457 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb.importLDIF;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.*;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.jeb.*;
import org.opends.messages.Message;
import static org.opends.messages.JebMessages.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.*;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
/**
 * A thread to process import entries from a queue.  Multiple instances of
 * this class process entries from a single shared queue.
 */
public class WorkThread extends DirectoryThread {
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /*
   * Work queue of work items.
   */
  private BlockingQueue<WorkElement> workQueue;
  /**
   * The number of entries imported by this thread.
   */
  private int importedCount = 0;
  //Root container.
  private RootContainer rootContainer;
  /**
   * A flag that is set when the thread has been told to stop processing.
   */
  private boolean stopRequested = false;
  //The thread number related to a thread.
  private int threadNumber;
  //The substring buffer manager to use.
  private BufferManager bufferMgr;
  //Flag set when substring buffer should be flushed.
  private boolean flushBuffer = true;
  /**
   * Create a work thread instance using the specified parameters.
   *
   * @param workQueue  The work queue to pull work off of.
   * @param threadNumber The thread number.
   * @param bufferMgr  The buffer manager to use.
   * @param rootContainer The root container.
   */
  public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber,
                                BufferManager bufferMgr,
                                RootContainer rootContainer) {
    super("Import Worker Thread " + threadNumber);
    this.threadNumber = threadNumber;
    this.workQueue = workQueue;
    this.bufferMgr = bufferMgr;
    this.rootContainer = rootContainer;
  }
  /**
   * Get the number of entries imported by this thread.
   * @return The number of entries imported by this thread.
   */
   int getImportedCount() {
    return importedCount;
  }
  /**
   * Tells the thread to stop processing.
   */
   void stopProcessing() {
    stopRequested = true;
  }
  /**
   * Tells thread to flush substring buffer.
   *
   * @param flush Set to false if substring flush should be skipped.
   */
   void setFlush(boolean flush) {
    this.flushBuffer = flush;
  }
  /**
   * Run the thread. Read from item from queue and give it to the
   * buffer manage, unless told to stop. Once stopped, ask buffer manager
   * to flush and exit.
   *
   */
  public void run()
  {
    try {
      do {
        try {
          WorkElement element = workQueue.poll(1000, TimeUnit.MILLISECONDS);
          if(element != null) {
           process(element);
          }
        }
        catch (InterruptedException e) {
          if (debugEnabled()) {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      } while (!stopRequested);
      if(flushBuffer) {
        bufferMgr.flushAll(threadNumber);
      }
    } catch (Exception e) {
      if (debugEnabled()) {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      throw new RuntimeException(e);
    }
  }
  /**
   * Process a work element.
   *
   * @param element The work elemenet to process.
   *
   * @throws DatabaseException If a database error occurs.
   * @throws DirectoryException If a directory error occurs.
   * @throws JebException If a JEB error occurs.
   */
  private void process(WorkElement element)
  throws DatabaseException, DirectoryException, JebException {
    Transaction txn = null;
    EntryID entryID;
    if((entryID = processDN2ID(element, txn)) == null)
      return;
    if(!processParent(element, entryID, txn))
      return;
    if(!processID2Entry(element, entryID, txn))
      return;
    procesID2SCEntry(element, entryID, txn);
    processIndexesEntry(element, entryID, txn);
  }
  /**
   * Delete all indexes related to the specified entry ID using the specified
   * entry to generate the keys.
   *
   * @param element The work element.
   * @param existingEntry The existing entry to replace.
   * @param entryID The entry ID to remove from the keys.
   * @param txn A transaction.
   * @throws DatabaseException If a database error occurs.
   */
  private void
  processIndexesEntryDelete(WorkElement element, Entry existingEntry,
                            EntryID entryID, Transaction txn)
          throws DatabaseException {
    DNContext context = element.getContext();
    Map<AttributeType, AttributeIndex> attrIndexMap =
            context.getAttrIndexMap();
    for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
            attrIndexMap.entrySet()) {
      AttributeType attrType = mapEntry.getKey();
      if(existingEntry.hasAttribute(attrType)) {
        AttributeIndex attributeIndex = mapEntry.getValue();
        Index index;
        if((index=attributeIndex.getEqualityIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
        }
        if((index=attributeIndex.getPresenceIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
        }
        if((index=attributeIndex.getSubstringIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
        }
        if((index=attributeIndex.getOrderingIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
        }
        if((index=attributeIndex.getApproximateIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
        }
      }
    }
  }
  /**
   * Process all indexes using the specified entry ID.
   *
   * @param element The work element.
   * @param entryID The entry ID to process.
   * @param txn A transaction.
   * @throws DatabaseException If an database error occurs.
   */
  private void
  processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn)
          throws DatabaseException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
    if (ldifImportConfig.appendToExistingData() &&
            ldifImportConfig.replaceExistingEntries()) {
      Entry existingEntry = element.getExistingEntry();
      processIndexesEntryDelete(element, existingEntry, entryID, txn);
    }
    Map<AttributeType, AttributeIndex> attrIndexMap =
            context.getAttrIndexMap();
    for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
            attrIndexMap.entrySet()) {
      AttributeType attrType = mapEntry.getKey();
      if(entry.hasAttribute(attrType)) {
        AttributeIndex attributeIndex = mapEntry.getValue();
        Index index;
        if((index=attributeIndex.getEqualityIndex()) != null) {
          insert(index, entry, entryID, txn);
        }
        if((index=attributeIndex.getPresenceIndex()) != null) {
          insert(index, entry, entryID, txn);
        }
        if((index=attributeIndex.getSubstringIndex()) != null) {
          bufferMgr.insert(index,entry, entryID, txn);
        }
        if((index=attributeIndex.getOrderingIndex()) != null) {
          insert(index, entry, entryID, txn);
        }
        if((index=attributeIndex.getApproximateIndex()) != null) {
          insert(index, entry, entryID, txn);
        }
      }
    }
  }
  /**
   * Process id2children/id2subtree indexes for the specified entry ID.
   *
   * @param element The work element.
   * @param entryID The entry ID to process.
   * @param txn A transaction.
   * @throws DatabaseException If an database error occurs.
   */
  private  void
  procesID2SCEntry(WorkElement element, EntryID entryID,
                   Transaction txn) throws DatabaseException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
    if (ldifImportConfig.appendToExistingData() &&
            ldifImportConfig.replaceExistingEntries()) {
      return;
    }
    Index id2children = context.getEntryContainer().getID2Children();
    Index id2subtree = context.getEntryContainer().getID2Subtree();
    insert(id2children, entry, entryID, txn);
    insert(id2subtree, entry, entryID, txn);
  }
  /**
   * Insert specified entry ID into the specified index using the entry to
   * generate the keys.
   *
   * @param index  The index to insert into.
   * @param entry The entry to generate the keys from.
   * @param entryID The entry ID to insert.
   * @param txn A transaction.
   * @return <CODE>True</CODE> if insert succeeded.
   * @throws DatabaseException If a database error occurs.
   */
  private boolean
  insert(Index index, Entry entry, EntryID entryID,
         Transaction txn) throws DatabaseException {
    Set<byte[]> keySet = new HashSet<byte[]>();
    index.indexer.indexEntry(txn, entry, keySet);
    return index.insert(txn, keySet,  entryID);
  }
  /**
   * Delete specified entry ID into the specified index using the entry to
   * generate the keys.
   *
   * @param index  The index to insert into.
   * @param entry The entry to generate the keys from.
   * @param entryID The entry ID to insert.
   * @param txn A transaction.
   * @throws DatabaseException If a database error occurs.
   */
  private void
  delete(Index index, Entry entry, EntryID entryID,
         Transaction txn) throws DatabaseException {
    Set<byte[]> keySet = new HashSet<byte[]>();
    index.indexer.indexEntry(txn, entry, keySet);
    index.delete(txn, keySet,  entryID);
  }
  /**
   * Insert entry from work element into id2entry DB.
   *
   * @param element The work element containing the entry.
   * @param entryID The entry ID to use as the key.
   * @param txn A transaction.
   * @return <CODE>True</CODE> If the insert succeeded.
   * @throws DatabaseException If a database error occurs.
   * @throws DirectoryException  If a directory error occurs.
   */
  private boolean
  processID2Entry(WorkElement element, EntryID entryID, Transaction txn)
          throws DatabaseException, DirectoryException {
    boolean ret;
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    ID2Entry id2entry = context.getEntryContainer().getID2Entry();
    DN2URI dn2uri = context.getEntryContainer().getDN2URI();
    ret=id2entry.put(txn, entryID, entry);
    if(ret) {
      importedCount++;
      LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
      if (ldifImportConfig.appendToExistingData() &&
              ldifImportConfig.replaceExistingEntries()) {
        Entry existingEntry = element.getExistingEntry();
        dn2uri.replaceEntry(txn, existingEntry, entry);
      } else {
        ret= dn2uri.addEntry(txn, entry);
      }
    }
    return ret;
  }
  /**
   * Process entry from work element checking if it's parent exists.
   *
   * @param element The work element containing the entry.
   * @param entryID The entry ID to use as the key.
   * @param txn A transaction.
   * @return <CODE>True</CODE> If the insert succeeded.
   * @throws DatabaseException If a database error occurs.
   */
  private boolean
  processParent(WorkElement element, EntryID entryID, Transaction txn)
          throws DatabaseException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
    if (ldifImportConfig.appendToExistingData() &&
            ldifImportConfig.replaceExistingEntries()) {
      return true;
    }
    EntryID parentID = null;
    DN entryDN = entry.getDN();
    DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN);
    DN2ID dn2id = context.getEntryContainer().getDN2ID();
    if (parentDN != null) {
      parentID = context.getParentID(parentDN, dn2id, txn);
      if (parentID == null) {
        dn2id.remove(txn, entryDN);
        Message msg =
                ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
        context.getLDIFReader().rejectLastEntry(msg);
        return false;
      }
    }
    ArrayList<EntryID> IDs;
    if (parentDN != null && context.getParentDN() != null &&
            parentDN.equals(context.getParentDN())) {
      IDs = new ArrayList<EntryID>(context.getIDs());
      IDs.set(0, entryID);
    }
    else {
      IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
      IDs.add(entryID);
      if (parentID != null)
      {
        IDs.add(parentID);
        EntryContainer ec = context.getEntryContainer();
        for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
             dn = ec.getParentWithinBase(dn)) {
          EntryID nodeID = dn2id.get(txn, dn);
          IDs.add(nodeID);
        }
      }
    }
    context.setParentDN(parentDN);
    context.setIDs(IDs);
    entry.setAttachment(IDs);
    return true;
  }
  /**
   * Process the a entry from the work element into the dn2id DB.
   *
   * @param element The work element containing the entry.
   * @param txn A transaction.
   * @return An entry ID.
   * @throws DatabaseException If a database error occurs.
   * @throws JebException If a JEB error occurs.
   */
  private EntryID
  processDN2ID(WorkElement element, Transaction txn)
          throws DatabaseException, JebException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    DN2ID dn2id = context.getEntryContainer().getDN2ID();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
    DN entryDN = entry.getDN();
    EntryID entryID = dn2id.get(txn, entryDN);
    if (entryID != null) {
      if (ldifImportConfig.appendToExistingData() &&
              ldifImportConfig.replaceExistingEntries()) {
        ID2Entry id2entry = context.getEntryContainer().getID2Entry();
        Entry existingEntry = id2entry.get(txn, entryID);
        element.setExistingEntry(existingEntry);
      } else {
        Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
        context.getLDIFReader().rejectLastEntry(msg);
        entryID = null;
      }
    } else {
      entryID = rootContainer.getNextEntryID();
      dn2id.insert(txn, entryDN, entryID);
    }
    context.removePending(entryDN);
    return entryID;
  }
}
opends/tests/unit-tests-testng/resource/config-changes.ldif
@@ -422,10 +422,7 @@
ds-cfg-index-entry-limit: 1
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
ds-cfg-import-temp-directory: import-tmp
ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -659,10 +656,7 @@
ds-cfg-index-entry-limit: 10
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
ds-cfg-import-temp-directory: importTmp
ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -836,10 +830,7 @@
ds-cfg-index-entry-limit: 10
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
ds-cfg-import-temp-directory: importTmp
ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -1008,10 +999,7 @@
ds-cfg-index-entry-limit: 10
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
ds-cfg-import-temp-directory: importTmp
ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -1214,10 +1202,7 @@
ds-cfg-index-entry-limit: 13
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
ds-cfg-import-temp-directory: importTmp
ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10