From acf59627f06a7d061b3da8f61c60f6d2dc8d7a8d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 20 May 2015 12:11:06 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java  |    8 
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 1398 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 1,402 insertions(+), 4 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
index 96a0a74..c3c2cac 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
@@ -319,9 +319,9 @@
     this.dnCache = null;
   }
 
-  @SuppressWarnings("javadoc")
-  OnDiskMergeBufferImporter(RootContainer rootContainer, LDIFImportConfig importCfg, PluggableBackendCfg backendCfg,
-      ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException
+  private OnDiskMergeBufferImporter(RootContainer rootContainer, LDIFImportConfig importCfg,
+      PluggableBackendCfg backendCfg, ServerContext serverContext)
+      throws InitializationException, ConfigException, StorageRuntimeException
   {
     this.rootContainer = rootContainer;
     this.rebuildManager = null;
@@ -3230,7 +3230,7 @@
      * @param entryLimit
      *          The entry limit for the index.
      */
-    private IndexKey(AttributeType attributeType, String indexID, int entryLimit)
+    IndexKey(AttributeType attributeType, String indexID, int entryLimit)
     {
       this.attributeType = attributeType;
       this.indexID = indexID;
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
new file mode 100644
index 0000000..005b07c
--- /dev/null
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -0,0 +1,1398 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2008-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011-2015 ForgeRock AS
+ */
+package org.opends.server.backends.pluggable;
+
+import static org.opends.messages.BackendMessages.*;
+import static org.opends.server.backends.pluggable.DnKeyFormat.*;
+import static org.opends.server.core.DirectoryServer.*;
+import static org.opends.server.util.DynamicConstants.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
+import org.opends.server.admin.std.server.BackendIndexCfg;
+import org.opends.server.admin.std.server.PersistitBackendCfg;
+import org.opends.server.admin.std.server.PluggableBackendCfg;
+import org.opends.server.backends.persistit.PersistItStorage;
+import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
+import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
+import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.DNCache;
+import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.IndexKey;
+import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.ReadOperation;
+import org.opends.server.backends.pluggable.spi.ReadableTransaction;
+import org.opends.server.backends.pluggable.spi.Storage;
+import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
+import org.opends.server.backends.pluggable.spi.TreeName;
+import org.opends.server.backends.pluggable.spi.UpdateFunction;
+import org.opends.server.backends.pluggable.spi.WriteOperation;
+import org.opends.server.backends.pluggable.spi.WriteableTransaction;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ServerContext;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.LDIFImportResult;
+import org.opends.server.util.Platform;
+
+/**
+ * This class provides the engine that performs both importing of LDIF files and
+ * the rebuilding of indexes.
+ */
+final class OnDiskMergeStorageImporter
+{
+  /**
+   * Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting
+   * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
+   */
+  @SuppressWarnings("javadoc")
+  static final class StrategyImpl implements ImportStrategy
+  {
+    private final PluggableBackendCfg backendCfg;
+
+    StrategyImpl(PluggableBackendCfg backendCfg)
+    {
+      this.backendCfg = backendCfg;
+    }
+
+    @Override
+    public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer,
+        ServerContext serverContext) throws DirectoryException, InitializationException
+    {
+      try
+      {
+        return new OnDiskMergeStorageImporter(rootContainer, importConfig, backendCfg, serverContext).processImport();
+      }
+      catch (DirectoryException | InitializationException e)
+      {
+        logger.traceException(e);
+        throw e;
+      }
+      catch (ConfigException e)
+      {
+        logger.traceException(e);
+        throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+      }
+      catch (Exception e)
+      {
+        logger.traceException(e);
+        throw new DirectoryException(getServerErrorResultCode(),
+            LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
+      }
+    }
+  }
+
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+  private static final int TIMER_INTERVAL = 10000;
+  private static final String DEFAULT_TMP_DIR = "import-tmp";
+  private static final String DN_CACHE_DIR = "dn-cache";
+
+  /** Defaults for DB cache. */
+  private static final int MAX_DB_CACHE_SIZE = 8 * MB;
+  private static final int MAX_DB_LOG_SIZE = 10 * MB;
+  private static final int MIN_DB_CACHE_SIZE = 4 * MB;
+
+  /**
+   * Defaults for LDIF reader buffers, min memory required to import and default
+   * size for byte buffers.
+   */
+  private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
+  private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE + MAX_DB_LOG_SIZE;
+
+  /** Max size of phase one buffer. */
+  private static final int MAX_BUFFER_SIZE = 2 * MB;
+  /** Min size of phase one buffer. */
+  private static final int MIN_BUFFER_SIZE = 4 * KB;
+  /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
+  private static final int SMALL_HEAP_SIZE = 256 * MB;
+
+  /** Root container. */
+  private final RootContainer rootContainer;
+  /** Import configuration. */
+  private final LDIFImportConfig importCfg;
+  private final ServerContext serverContext;
+
+  /** LDIF reader. */
+  private ImportLDIFReader reader;
+  /** Phase one imported entries count. */
+  private final AtomicLong importCount = new AtomicLong(0);
+  /** Migrated entry count. */
+  private int migratedCount;
+
+  /** Phase one buffer size in bytes. */
+  private int bufferSize;
+  /** Index count. */
+  private final int indexCount;
+  /** Thread count. */
+  private int threadCount;
+
+  /** Whether DN validation should be performed. If true, then it is performed during phase one. */
+  private final boolean validateDNs;
+
+  /** Temp scratch directory. */
+  private final File tempDir;
+  /** DN cache used when DN validation is done in first phase. */
+  private final DNCache dnCache;
+  /** Size in bytes of DN cache. */
+  private long dnCacheSize;
+  /** Available memory at the start of the import. */
+  private long availableMemory;
+  /** Size in bytes of DB cache. */
+  private long dbCacheSize;
+
+  /** Map of DNs to Suffix objects. */
+  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<>();
+
+  /** Set to true if the backend was cleared. */
+  private final boolean clearedBackend;
+
+  /** Used to shutdown import if an error occurs in phase one. */
+  private volatile boolean isCanceled;
+
+  /** Number of phase one buffers. */
+  private int phaseOneBufferCount;
+
+  private OnDiskMergeStorageImporter(RootContainer rootContainer, LDIFImportConfig importCfg,
+      PluggableBackendCfg backendCfg, ServerContext serverContext)
+      throws InitializationException, ConfigException, StorageRuntimeException
+  {
+    this.rootContainer = rootContainer;
+    this.importCfg = importCfg;
+    this.serverContext = serverContext;
+
+    if (importCfg.getThreadCount() == 0)
+    {
+      this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
+    }
+    else
+    {
+      this.threadCount = importCfg.getThreadCount();
+    }
+
+    // Determine the number of indexes.
+    this.indexCount = getTotalIndexCount(backendCfg);
+
+    this.clearedBackend = mustClearBackend(importCfg, backendCfg);
+
+    validateDNs = !importCfg.getSkipDNValidation();
+    this.tempDir = prepareTempDir(backendCfg, importCfg.getTmpDirectory());
+    // be careful: requires that a few data has been set
+    computeMemoryRequirements();
+
+    if (validateDNs)
+    {
+      final File dnCachePath = new File(tempDir, DN_CACHE_DIR);
+      dnCachePath.mkdirs();
+      this.dnCache = new DNCacheImpl(dnCachePath);
+    }
+    else
+    {
+      this.dnCache = null;
+    }
+  }
+
+  private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException
+  {
+    File parentDir = getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR);
+    File tempDir = new File(parentDir, backendCfg.getBackendId());
+    recursiveDelete(tempDir);
+    if (!tempDir.exists() && !tempDir.mkdirs())
+    {
+      throw new InitializationException(ERR_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
+    }
+    return tempDir;
+  }
+
+  /**
+   * Returns whether the backend must be cleared.
+   *
+   * @param importCfg
+   *          the import configuration object
+   * @param backendCfg
+   *          the backend configuration object
+   * @return true if the backend must be cleared, false otherwise
+   * @see #prepareSuffix(WriteableTransaction, EntryContainer) for per-suffix cleanups.
+   */
+  private static boolean mustClearBackend(LDIFImportConfig importCfg, PluggableBackendCfg backendCfg)
+  {
+    return !importCfg.appendToExistingData()
+        && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+    /*
+     * Why do we clear when there is only one baseDN?
+     * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
+     * so if there is only one baseDN for this backend, then clear it now.
+     */
+  }
+
+  private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException
+  {
+    int indexes = 2; // dn2id, dn2uri
+    for (String indexName : backendCfg.listBackendIndexes())
+    {
+      BackendIndexCfg index = backendCfg.getBackendIndex(indexName);
+      SortedSet<IndexType> types = index.getIndexType();
+      if (types.contains(IndexType.EXTENSIBLE))
+      {
+        indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
+      }
+      else
+      {
+        indexes += types.size();
+      }
+    }
+    return indexes;
+  }
+
+  /**
+   * Calculate buffer sizes and initialize properties based on memory.
+   *
+   * @throws InitializationException
+   *           If a problem occurs during calculation.
+   */
+  private void computeMemoryRequirements() throws InitializationException
+  {
+    // Calculate amount of usable memory. This will need to take into account
+    // various fudge factors, including the number of IO buffers used by the
+    // scratch writers (1 per index).
+    calculateAvailableMemory();
+
+    final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
+
+    // We need caching when doing DN validation
+    if (validateDNs)
+    {
+      // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
+      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
+      {
+        dbCacheSize = 500 * KB;
+        dnCacheSize = 500 * KB;
+      }
+      else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
+      {
+        dbCacheSize = MIN_DB_CACHE_SIZE;
+        dnCacheSize = MIN_DB_CACHE_SIZE;
+      }
+      else if (!clearedBackend)
+      {
+        // Appending to existing data so reserve extra memory for the DB cache
+        // since it will be needed for dn2id queries.
+        dbCacheSize = usableMemory * 33 / 100;
+        dnCacheSize = usableMemory * 33 / 100;
+      }
+      else
+      {
+        dbCacheSize = MAX_DB_CACHE_SIZE;
+        dnCacheSize = usableMemory * 66 / 100;
+      }
+    }
+    else
+    {
+      // No DN validation: calculate memory for DB cache and buffers.
+
+      // No need for DN2ID cache.
+      dnCacheSize = 0;
+
+      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
+      {
+        dbCacheSize = 500 * KB;
+      }
+      else if (usableMemory < MIN_DB_CACHE_MEMORY)
+      {
+        dbCacheSize = MIN_DB_CACHE_SIZE;
+      }
+      else
+      {
+        // No need to differentiate between append/clear backend, since dn2id is
+        // not being queried.
+        dbCacheSize = MAX_DB_CACHE_SIZE;
+      }
+    }
+
+    final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize;
+    final int oldThreadCount = threadCount;
+    if (indexCount != 0) // Avoid / by zero
+    {
+      while (true)
+      {
+        phaseOneBufferCount = 2 * indexCount * threadCount;
+
+        // Scratch writers allocate 4 buffers per index as well.
+        final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount);
+        long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount;
+        // We need (2 * bufferSize) to fit in an int for the insertByteStream
+        // and deleteByteStream constructors.
+        bufferSize = (int) Math.min(longBufferSize, Integer.MAX_VALUE / 2);
+
+        if (bufferSize > MAX_BUFFER_SIZE)
+        {
+          if (validateDNs)
+          {
+            // The buffers are big enough: the memory is best used for the DN2ID temp DB
+            bufferSize = MAX_BUFFER_SIZE;
+
+            final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
+            if (!clearedBackend)
+            {
+              dbCacheSize += extraMemory / 2;
+              dnCacheSize += extraMemory / 2;
+            }
+            else
+            {
+              dnCacheSize += extraMemory;
+            }
+          }
+
+          break;
+        }
+        else if (bufferSize > MIN_BUFFER_SIZE)
+        {
+          // This is acceptable.
+          break;
+        }
+        else if (threadCount > 1)
+        {
+          // Retry using less threads.
+          threadCount--;
+        }
+        else
+        {
+          // Not enough memory.
+          final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
+          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(
+              usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize));
+        }
+      }
+    }
+
+    if (oldThreadCount != threadCount)
+    {
+      logger.info(NOTE_IMPORT_ADJUST_THREAD_COUNT, oldThreadCount, threadCount);
+    }
+
+    logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
+    if (dnCacheSize > 0)
+    {
+      logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize);
+    }
+    logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
+  }
+
+  /**
+   * Calculates the amount of available memory which can be used by this import,
+   * taking into account whether or not the import is running offline or online
+   * as a task.
+   */
+  private void calculateAvailableMemory()
+  {
+    final long totalAvailableMemory;
+    if (DirectoryServer.isRunning())
+    {
+      // Online import/rebuild.
+      final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
+      totalAvailableMemory = Math.max(availableMemory, 16 * MB);
+    }
+    else
+    {
+      // Offline import/rebuild.
+      totalAvailableMemory = Platform.getUsableMemoryForCaching();
+    }
+
+    // Now take into account various fudge factors.
+    int importMemPct = 90;
+    if (totalAvailableMemory <= SMALL_HEAP_SIZE)
+    {
+      // Be pessimistic when memory is low.
+      importMemPct -= 25;
+    }
+
+    availableMemory = totalAvailableMemory * importMemPct / 100;
+  }
+
+  private void initializeSuffixes(WriteableTransaction txn) throws ConfigException, DirectoryException
+  {
+    for (EntryContainer ec : rootContainer.getEntryContainers())
+    {
+      Suffix suffix = getSuffix(txn, ec);
+      if (suffix != null)
+      {
+        dnSuffixMap.put(ec.getBaseDN(), suffix);
+      }
+    }
+  }
+
+  private Suffix getSuffix(WriteableTransaction txn, EntryContainer entryContainer)
+      throws ConfigException, DirectoryException
+  {
+    if (importCfg.appendToExistingData() || importCfg.clearBackend())
+    {
+      return new Suffix(entryContainer);
+    }
+
+    final DN baseDN = entryContainer.getBaseDN();
+    if (importCfg.getExcludeBranches().contains(baseDN))
+    {
+      // This entire base DN was explicitly excluded. Skip.
+      return null;
+    }
+
+    EntryContainer sourceEntryContainer = null;
+    List<DN> excludeBranches = getDescendants(baseDN, importCfg.getExcludeBranches());
+    List<DN> includeBranches = null;
+    if (!importCfg.getIncludeBranches().isEmpty())
+    {
+      includeBranches = getDescendants(baseDN, importCfg.getIncludeBranches());
+      if (includeBranches.isEmpty())
+      {
+        // There are no branches in the explicitly defined include list under this base DN.
+        // Skip this base DN altogether.
+        return null;
+      }
+
+      // Remove any overlapping include branches.
+      Iterator<DN> includeBranchIterator = includeBranches.iterator();
+      while (includeBranchIterator.hasNext())
+      {
+        DN includeDN = includeBranchIterator.next();
+        if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN))
+        {
+          includeBranchIterator.remove();
+        }
+      }
+
+      // Remove any exclude branches that are not are not under a include branch
+      // since they will be migrated as part of the existing entries
+      // outside of the include branches anyways.
+      Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
+      while (excludeBranchIterator.hasNext())
+      {
+        DN excludeDN = excludeBranchIterator.next();
+        if (!isAnyAncestorOf(includeBranches, excludeDN))
+        {
+          excludeBranchIterator.remove();
+        }
+      }
+
+      if (excludeBranches.isEmpty()
+          && includeBranches.size() == 1
+          && includeBranches.get(0).equals(baseDN))
+      {
+        // This entire base DN is explicitly included in the import with
+        // no exclude branches that we need to migrate.
+        // Just clear the entry container.
+        clearSuffix(entryContainer);
+      }
+      else
+      {
+        sourceEntryContainer = entryContainer;
+
+        // Create a temp entry container
+        DN tempDN = baseDN.child(DN.valueOf("dc=importTmp"));
+        entryContainer = rootContainer.openEntryContainer(tempDN, txn);
+      }
+    }
+    return new Suffix(entryContainer, sourceEntryContainer, includeBranches, excludeBranches);
+  }
+
+  private List<DN> getDescendants(DN baseDN, Set<DN> dns)
+  {
+    final List<DN> results = new ArrayList<>();
+    for (DN dn : dns)
+    {
+      if (baseDN.isAncestorOf(dn))
+      {
+        results.add(dn);
+      }
+    }
+    return results;
+  }
+
+  private static void clearSuffix(EntryContainer entryContainer)
+  {
+    entryContainer.lock();
+    entryContainer.clear();
+    entryContainer.unlock();
+  }
+
+  private static boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
+  {
+    for (DN dn : dns)
+    {
+      if (!dn.equals(childDN) && dn.isAncestorOf(childDN))
+      {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static boolean isAnyAncestorOf(List<DN> dns, DN childDN)
+  {
+    for (DN dn : dns)
+    {
+      if (dn.isAncestorOf(childDN))
+      {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private LDIFImportResult processImport() throws Exception
+  {
+    try {
+      try
+      {
+        reader = new ImportLDIFReader(importCfg, rootContainer);
+      }
+      catch (IOException ioe)
+      {
+        throw new InitializationException(ERR_IMPORT_LDIF_READER_IO_ERROR.get(), ioe);
+      }
+
+      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
+      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
+
+      final Storage storage = rootContainer.getStorage();
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableTransaction txn) throws Exception
+        {
+          initializeSuffixes(txn);
+          setIndexesTrusted(txn, false);
+        }
+      });
+
+      final long startTime = System.currentTimeMillis();
+      importPhaseOne();
+      final long phaseOneFinishTime = System.currentTimeMillis();
+      if (validateDNs)
+      {
+        dnCache.close();
+      }
+
+      if (isCanceled)
+      {
+        throw new InterruptedException("Import processing canceled.");
+      }
+
+      final long phaseTwoTime = System.currentTimeMillis();
+      importPhaseTwo();
+      if (isCanceled)
+      {
+        throw new InterruptedException("Import processing canceled.");
+      }
+      final long phaseTwoFinishTime = System.currentTimeMillis();
+
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableTransaction txn) throws Exception
+        {
+          setIndexesTrusted(txn, true);
+          switchEntryContainers(txn);
+        }
+      });
+      recursiveDelete(tempDir);
+      final long finishTime = System.currentTimeMillis();
+      final long importTime = finishTime - startTime;
+      logger.info(NOTE_IMPORT_PHASE_STATS, importTime / 1000,
+              (phaseOneFinishTime - startTime) / 1000,
+              (phaseTwoFinishTime - phaseTwoTime) / 1000);
+      float rate = 0;
+      if (importTime > 0)
+      {
+        rate = 1000f * reader.getEntriesRead() / importTime;
+      }
+      logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(),
+          reader.getEntriesIgnored(), reader.getEntriesRejected(),
+          migratedCount, importTime / 1000, rate);
+      return new LDIFImportResult(reader.getEntriesRead(),
+          reader.getEntriesRejected(), reader.getEntriesIgnored());
+    }
+    finally
+    {
+      close(reader);
+      if (validateDNs)
+      {
+        close(dnCache);
+      }
+    }
+  }
+
+  private void switchEntryContainers(WriteableTransaction txn) throws StorageRuntimeException, InitializationException
+  {
+    for (Suffix suffix : dnSuffixMap.values())
+    {
+      DN baseDN = suffix.getBaseDN();
+      EntryContainer entryContainer = suffix.getSrcEntryContainer();
+      if (entryContainer != null)
+      {
+        final EntryContainer toDelete = rootContainer.unregisterEntryContainer(baseDN);
+        toDelete.lock();
+        toDelete.close();
+        toDelete.delete(txn);
+        toDelete.unlock();
+
+        final EntryContainer replacement = suffix.getEntryContainer();
+        replacement.lock();
+        replacement.setTreePrefix(baseDN.toNormalizedUrlSafeString());
+        replacement.unlock();
+        rootContainer.registerEntryContainer(baseDN, replacement);
+      }
+    }
+  }
+
+  private void setIndexesTrusted(WriteableTransaction txn, boolean trusted) throws StorageRuntimeException
+  {
+    try
+    {
+      for (Suffix s : dnSuffixMap.values())
+      {
+        s.setIndexesTrusted(txn, trusted);
+      }
+    }
+    catch (StorageRuntimeException ex)
+    {
+      throw new StorageRuntimeException(NOTE_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()).toString());
+    }
+  }
+
+  /**
+   * Reads all entries from id2entry, and:
+   * <ol>
+   * <li>compute how the entry is indexed for each index</li>
+   * <li>store the result of indexing entries into in-memory index buffers</li>
+   * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
+   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
+   * </ol>
+   */
+  private void importPhaseOne() throws Exception
+  {
+    final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+    scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
+
+    final Storage storage = rootContainer.getStorage();
+    execService.submit(new MigrateExistingTask(storage)).get();
+
+    final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
+    if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
+    {
+      for (int i = 0; i < threadCount; i++)
+      {
+        tasks.add(new ImportTask(storage));
+      }
+    }
+    execService.invokeAll(tasks);
+    tasks.clear();
+
+    execService.submit(new MigrateExcludedTask(storage)).get();
+
+    shutdownAll(timerService, execService);
+  }
+
+  private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+  {
+    timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+
+  private static void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+  {
+    for (ExecutorService executorService : executorServices)
+    {
+      executorService.shutdown();
+    }
+    for (ExecutorService executorService : executorServices)
+    {
+      executorService.awaitTermination(30, TimeUnit.SECONDS);
+    }
+  }
+
+  private void importPhaseTwo() throws Exception
+  {
+    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
+    try
+    {
+      // TODO JNR
+    }
+    finally
+    {
+      shutdownAll(timerService);
+    }
+  }
+
+  /** Task used to migrate excluded branch. */
+  private final class MigrateExcludedTask extends ImportTask
+  {
+    private MigrateExcludedTask(final Storage storage)
+    {
+      super(storage);
+    }
+
+    @Override
+    void call0(WriteableTransaction txn) throws Exception
+    {
+      for (Suffix suffix : dnSuffixMap.values())
+      {
+        EntryContainer entryContainer = suffix.getSrcEntryContainer();
+        if (entryContainer != null && !suffix.getExcludeBranches().isEmpty())
+        {
+          logger.info(NOTE_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN());
+          Cursor<ByteString, ByteString> cursor = txn.openCursor(entryContainer.getDN2ID().getName());
+          try
+          {
+            for (DN excludedDN : suffix.getExcludeBranches())
+            {
+              final ByteString key = dnToDNKey(excludedDN, suffix.getBaseDN().size());
+              boolean success = cursor.positionToKeyOrNext(key);
+              if (success && key.equals(cursor.getKey()))
+              {
+                /*
+                 * This is the base entry for a branch that was excluded in the
+                 * import so we must migrate all entries in this branch over to
+                 * the new entry container.
+                 */
+                ByteStringBuilder end = afterKey(key);
+
+                while (success
+                    && ByteSequence.COMPARATOR.compare(key, end) < 0
+                    && !importCfg.isCancelled()
+                    && !isCanceled)
+                {
+                  EntryID id = new EntryID(cursor.getValue());
+                  Entry entry = entryContainer.getID2Entry().get(txn, id);
+                  processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
+                  migratedCount++;
+                  success = cursor.next();
+                }
+              }
+            }
+          }
+          catch (Exception e)
+          {
+            logger.error(ERR_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage());
+            isCanceled = true;
+            throw e;
+          }
+          finally
+          {
+            close(cursor);
+          }
+        }
+      }
+    }
+  }
+
+  /** Task to migrate existing entries. */
+  private final class MigrateExistingTask extends ImportTask
+  {
+    private MigrateExistingTask(final Storage storage)
+    {
+      super(storage);
+    }
+
+    @Override
+    void call0(WriteableTransaction txn) throws Exception
+    {
+      for (Suffix suffix : dnSuffixMap.values())
+      {
+        EntryContainer entryContainer = suffix.getSrcEntryContainer();
+        if (entryContainer != null && !suffix.getIncludeBranches().isEmpty())
+        {
+          logger.info(NOTE_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN());
+          Cursor<ByteString, ByteString> cursor = txn.openCursor(entryContainer.getDN2ID().getName());
+          try
+          {
+            final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
+            boolean success = cursor.next();
+            while (success
+                && !importCfg.isCancelled()
+                && !isCanceled)
+            {
+              final ByteString key = cursor.getKey();
+              if (!includeBranches.contains(key))
+              {
+                EntryID id = new EntryID(key);
+                Entry entry = entryContainer.getID2Entry().get(txn, id);
+                processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
+                migratedCount++;
+                success = cursor.next();
+              }
+              else
+              {
+                /*
+                 * This is the base entry for a branch that will be included
+                 * in the import so we do not want to copy the branch to the
+                 * new entry container.
+                 */
+                /*
+                 * Advance the cursor to next entry at the same level in the DIT
+                 * skipping all the entries in this branch.
+                 */
+                ByteStringBuilder begin = afterKey(key);
+                success = cursor.positionToKeyOrNext(begin);
+              }
+            }
+          }
+          catch (Exception e)
+          {
+            logger.error(ERR_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage());
+            isCanceled = true;
+            throw e;
+          }
+          finally
+          {
+            close(cursor);
+          }
+        }
+      }
+    }
+
+    private List<ByteString> includeBranchesAsBytes(Suffix suffix)
+    {
+      List<ByteString> includeBranches = new ArrayList<>(suffix.getIncludeBranches().size());
+      for (DN includeBranch : suffix.getIncludeBranches())
+      {
+        if (includeBranch.isDescendantOf(suffix.getBaseDN()))
+        {
+          includeBranches.add(dnToDNKey(includeBranch, suffix.getBaseDN().size()));
+        }
+      }
+      return includeBranches;
+    }
+  }
+
+  /**
+   * This task performs phase reading and processing of the entries read from
+   * the LDIF file(s). This task is used if the append flag wasn't specified.
+   */
+  private class ImportTask implements Callable<Void>
+  {
+    private final Storage storage;
+
+    public ImportTask(final Storage storage)
+    {
+      this.storage = storage;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public final Void call() throws Exception
+    {
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableTransaction txn) throws Exception
+        {
+          call0(txn);
+        }
+      });
+      return null;
+    }
+
+    void call0(WriteableTransaction txn) throws Exception
+    {
+      try
+      {
+        EntryInformation entryInfo;
+        while ((entryInfo = reader.readEntry(dnSuffixMap)) != null)
+        {
+          if (importCfg.isCancelled() || isCanceled)
+          {
+            return;
+          }
+          processEntry(txn, entryInfo.getEntry(), entryInfo.getEntryID(), entryInfo.getSuffix());
+        }
+      }
+      catch (Exception e)
+      {
+        logger.error(ERR_IMPORT_LDIF_IMPORT_TASK_ERR, e.getMessage());
+        isCanceled = true;
+        throw e;
+      }
+    }
+
+    void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix)
+        throws DirectoryException, StorageRuntimeException, InterruptedException
+    {
+      DN entryDN = entry.getName();
+      if (validateDNs && !dnSanityCheck(txn, entry, suffix))
+      {
+        suffix.removePending(entryDN);
+        return;
+      }
+      suffix.removePending(entryDN);
+      processDN2ID(suffix, entryDN, entryID);
+      processDN2URI(suffix, entry);
+      processIndexes(suffix, entry, entryID, false);
+      processVLVIndexes(txn, suffix, entry, entryID);
+      suffix.getID2Entry().put(txn, entryID, entry);
+      importCount.getAndIncrement();
+    }
+
+    /**
+     * Examine the DN for duplicates and missing parents.
+     *
+     * @return true if the import operation can proceed with the provided entry, false otherwise
+     */
+    @SuppressWarnings("javadoc")
+    boolean dnSanityCheck(WriteableTransaction txn, Entry entry, Suffix suffix)
+        throws StorageRuntimeException, InterruptedException
+    {
+      //Perform parent checking.
+      DN entryDN = entry.getName();
+      DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
+      if (parentDN != null && !suffix.isParentProcessed(txn, parentDN, dnCache, clearedBackend))
+      {
+        reader.rejectEntry(entry, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
+        return false;
+      }
+      if (!insert(txn, entryDN, suffix, dnCache))
+      {
+        reader.rejectEntry(entry, WARN_IMPORT_ENTRY_EXISTS.get());
+        return false;
+      }
+      return true;
+    }
+
+    private boolean insert(WriteableTransaction txn, DN entryDN, Suffix suffix, DNCache dnCache)
+    {
+      //If the backend was not cleared, then first check dn2id
+      //for DNs that might not exist in the DN cache.
+      if (!clearedBackend && suffix.getDN2ID().get(txn, entryDN) != null)
+      {
+        return false;
+      }
+      return dnCache.insert(entryDN);
+    }
+
+    void processIndexes(Suffix suffix, Entry entry, EntryID entryID, boolean allIndexes)
+        throws StorageRuntimeException, InterruptedException
+    {
+      for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
+      {
+        AttributeType attrType = mapEntry.getKey();
+        AttributeIndex attrIndex = mapEntry.getValue();
+        if (allIndexes || entry.hasAttribute(attrType))
+        {
+          for (Map.Entry<String, MatchingRuleIndex> mapEntry2 : attrIndex.getNameToIndexes().entrySet())
+          {
+            String indexID = mapEntry2.getKey();
+            MatchingRuleIndex index = mapEntry2.getValue();
+
+            IndexKey indexKey = new IndexKey(attrType, indexID, index.getIndexEntryLimit());
+            processAttribute(index, entry, entryID, indexKey);
+          }
+        }
+      }
+    }
+
+    void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID)
+        throws DirectoryException
+    {
+      final EntryContainer entryContainer = suffix.getEntryContainer();
+      final IndexBuffer buffer = new IndexBuffer(entryContainer);
+      for (VLVIndex vlvIdx : entryContainer.getVLVIndexes())
+      {
+        vlvIdx.addEntry(buffer, entryID, entry);
+      }
+      buffer.flush(txn);
+    }
+
+    void processAttribute(MatchingRuleIndex index, Entry entry, EntryID entryID, IndexKey indexKey)
+        throws StorageRuntimeException, InterruptedException
+    {
+      for (ByteString key : index.indexEntry(entry))
+      {
+        processKey(index, key, entryID, indexKey);
+      }
+    }
+
+    final int processKey(Tree tree, ByteString key, EntryID entryID, IndexKey indexKey) throws InterruptedException
+    {
+      // TODO JNR implement
+      return -1;
+    }
+
+    void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
+    {
+      // TODO JNR implement
+    }
+
+    private void processDN2URI(Suffix suffix, Entry entry)
+    {
+      // TODO JNR implement
+    }
+  }
+
+  /** This class reports progress of first phase of import processing at fixed intervals. */
+  private final class FirstPhaseProgressTask extends TimerTask
+  {
+    /** The number of entries that had been read at the time of the previous progress report. */
+    private long previousCount;
+    /** The time in milliseconds of the previous progress report. */
+    private long previousTime;
+
+    /** Create a new import progress task. */
+    public FirstPhaseProgressTask()
+    {
+      previousTime = System.currentTimeMillis();
+    }
+
+    /** The action to be performed by this timer task. */
+    @Override
+    public void run()
+    {
+      long entriesRead = reader.getEntriesRead();
+      long entriesIgnored = reader.getEntriesIgnored();
+      long entriesRejected = reader.getEntriesRejected();
+      long deltaCount = entriesRead - previousCount;
+
+      long latestTime = System.currentTimeMillis();
+      long deltaTime = latestTime - previousTime;
+      if (deltaTime == 0)
+      {
+        return;
+      }
+      float rate = 1000f * deltaCount / deltaTime;
+      logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
+
+      previousCount = entriesRead;
+      previousTime = latestTime;
+    }
+  }
+
+  /** This class reports progress of the second phase of import processing at fixed intervals. */
+  private class SecondPhaseProgressTask extends TimerTask
+  {
+    /** The time in milliseconds of the previous progress report. */
+    private long previousTime;
+
+    /** Create a new import progress task. */
+    public SecondPhaseProgressTask()
+    {
+      previousTime = System.currentTimeMillis();
+    }
+
+    /** The action to be performed by this timer task. */
+    @Override
+    public void run()
+    {
+      long latestTime = System.currentTimeMillis();
+      long deltaTime = latestTime - previousTime;
+      if (deltaTime == 0)
+      {
+        return;
+      }
+
+      previousTime = latestTime;
+
+      // DN index managers first.
+      printStats(deltaTime, true);
+      // non-DN index managers second
+      printStats(deltaTime, false);
+    }
+
+    private void printStats(long deltaTime, boolean dn2id)
+    {
+      // TODO JNR
+    }
+  }
+
+  /** Invocation handler for the {@link PluggableBackendCfg} proxy. */
+  private static final class BackendCfgHandler implements InvocationHandler
+  {
+    private final Map<String, Object> returnValues;
+
+    private BackendCfgHandler(final Map<String, Object> returnValues)
+    {
+      this.returnValues = returnValues;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+    {
+      final String methodName = method.getName();
+      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
+      {
+        // ignore calls to (add|remove)*ChangeListener() methods
+        return null;
+      }
+
+      final Object returnValue = returnValues.get(methodName);
+      if (returnValue != null)
+      {
+        return returnValue;
+      }
+      throw new IllegalArgumentException("Unhandled method call on proxy ("
+          + BackendCfgHandler.class.getSimpleName()
+          + ") for method (" + method
+          + ") with arguments (" + Arrays.toString(args) + ")");
+    }
+  }
+
+  /**
+   * Used to check DN's when DN validation is performed during phase one processing.
+   * It is deleted after phase one processing.
+   */
+  private final class DNCacheImpl implements DNCache
+  {
+    private static final String DB_NAME = "dn_cache";
+    private final TreeName dnCache = new TreeName("", DB_NAME);
+    private final Storage storage;
+
+    private DNCacheImpl(File dnCachePath) throws StorageRuntimeException
+    {
+      final Map<String, Object> returnValues = new HashMap<>();
+      returnValues.put("getDBDirectory", dnCachePath.getAbsolutePath());
+      returnValues.put("getBackendId", DB_NAME);
+      returnValues.put("getDBCacheSize", 0L);
+      returnValues.put("getDBCachePercent", 10);
+      returnValues.put("isDBTxnNoSync", true);
+      returnValues.put("getDBDirectoryPermissions", "700");
+      returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
+      returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
+      try
+      {
+        returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importDNCache,cn=Backends,cn=config"));
+        storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
+            DirectoryServer.getInstance().getServerContext());
+        storage.open();
+        storage.write(new WriteOperation()
+        {
+          @Override
+          public void run(WriteableTransaction txn) throws Exception
+          {
+            txn.openTree(dnCache);
+          }
+        });
+      }
+      catch (Exception e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+    }
+
+    private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
+    {
+      return (PersistitBackendCfg) Proxy.newProxyInstance(
+          getClass().getClassLoader(),
+          new Class<?>[] { PersistitBackendCfg.class },
+          new BackendCfgHandler(returnValues));
+    }
+
+    private static final long FNV_INIT = 0xcbf29ce484222325L;
+    private static final long FNV_PRIME = 0x100000001b3L;
+
+    /** Hash the DN bytes. Uses the FNV-1a hash. */
+    private ByteString fnv1AHashCode(DN dn)
+    {
+      final ByteString b = dn.toNormalizedByteString();
+
+      long hash = FNV_INIT;
+      for (int i = 0; i < b.length(); i++)
+      {
+        hash ^= b.byteAt(i);
+        hash *= FNV_PRIME;
+      }
+      return ByteString.valueOf(hash);
+    }
+
+    @Override
+    public void close() throws StorageRuntimeException
+    {
+      try
+      {
+        storage.close();
+      }
+      finally
+      {
+        storage.removeStorageFiles();
+      }
+    }
+
+    @Override
+    public boolean insert(DN dn) throws StorageRuntimeException
+    {
+      // Use a compact representation for key
+      // and a reversible representation for value
+      final ByteString key = fnv1AHashCode(dn);
+      final ByteString dnValue = ByteString.valueOf(dn);
+
+      return insert(key, dnValue);
+    }
+
+    private boolean insert(final ByteString key, final ByteString dn) throws StorageRuntimeException
+    {
+      final AtomicBoolean updateResult = new AtomicBoolean();
+      try
+      {
+        storage.write(new WriteOperation()
+        {
+          @Override
+          public void run(WriteableTransaction txn) throws Exception
+          {
+            updateResult.set(txn.update(dnCache, key, new UpdateFunction()
+            {
+              @Override
+              public ByteSequence computeNewValue(ByteSequence existingDns)
+              {
+                if (containsDN(existingDns, dn))
+                {
+                  // no change
+                  return existingDns;
+                }
+                else if (existingDns != null)
+                {
+                  return addDN(existingDns, dn);
+                }
+                else
+                {
+                  return singletonList(dn);
+                }
+              }
+
+              /** Add the DN to the DNs because of a hash collision. */
+              private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
+              {
+                final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
+                builder.append(dnList);
+                builder.append(dntoAdd.length());
+                builder.append(dntoAdd);
+                return builder;
+              }
+
+              /** Create a list of dn made of one element. */
+              private ByteSequence singletonList(final ByteSequence dntoAdd)
+              {
+                final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
+                singleton.append(dntoAdd.length());
+                singleton.append(dntoAdd);
+                return singleton;
+              }
+            }));
+          }
+        });
+        return updateResult.get();
+      }
+      catch (StorageRuntimeException e)
+      {
+        throw e;
+      }
+      catch (Exception e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+    }
+
+    /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
+    private boolean containsDN(ByteSequence existingDns, ByteString dnToFind)
+    {
+      if (existingDns != null && existingDns.length() > 0)
+      {
+        final ByteSequenceReader reader = existingDns.asReader();
+        int pos = 0;
+        while (reader.remaining() != 0)
+        {
+          int dnLength = reader.getInt();
+          int dnStart = pos + INT_SIZE;
+          ByteSequence existingDn = existingDns.subSequence(dnStart, dnStart + dnLength);
+          if (dnToFind.equals(existingDn))
+          {
+            return true;
+          }
+          reader.skip(dnLength);
+          pos = reader.position();
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean contains(final DN dn)
+    {
+      try
+      {
+        return storage.read(new ReadOperation<Boolean>()
+        {
+          @Override
+          public Boolean run(ReadableTransaction txn) throws Exception
+          {
+            final ByteString key = fnv1AHashCode(dn);
+            final ByteString existingDns = txn.read(dnCache, key);
+
+            return containsDN(existingDns, ByteString.valueOf(dn));
+          }
+        });
+      }
+      catch (StorageRuntimeException e)
+      {
+        throw e;
+      }
+      catch (Exception e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+    }
+  }
+}

--
Gitblit v1.10.0