From 5111f7cbf41682bebb67bd0876818b9432961101 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 05 Sep 2016 09:33:27 +0000
Subject: [PATCH] OPENDJ-3263: import with DN validation on JE is using high disk space

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java     |  508 +++++++++++++++++++++++----------------------
 opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java |  111 +++++++++
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java          |   30 ++
 3 files changed, 389 insertions(+), 260 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
index 998628d..50e5d31 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
@@ -11,11 +11,15 @@
  * Header, with the fields enclosed by brackets [] replaced by your own identifying
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
- * Copyright 2015 ForgeRock AS.
+ * Copyright 2015-2016 ForgeRock AS.
  */
 package org.opends.server.backends.pluggable;
 
+import java.util.concurrent.ExecutionException;
+
+import org.forgerock.opendj.config.server.ConfigException;
 import org.opends.server.backends.RebuildConfig;
+import org.opends.server.types.InitializationException;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.LDIFImportResult;
 
@@ -28,20 +32,34 @@
    * @param importConfig
    *          The configuration to use when performing the import
    * @return Information about the result of the import processing
-   * @throws Exception
+   * @throws InitializationException
+   *           If a problem occurs during initialization
+   * @throws ConfigException
+   *           If the configuration is invalid
+   * @throws InterruptedException
+   *           If the import process has been interrupted
+   * @throws ExecutionException
    *           If a problem occurs while performing the LDIF import
    * @see {@link Backend#importLDIF(LDIFImportConfig, ServerContext)}
    */
-  LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception;
+  LDIFImportResult importLDIF(LDIFImportConfig importConfig)
+      throws InitializationException, ConfigException, InterruptedException, ExecutionException;
 
   /**
    * Rebuild indexes.
    *
    * @param rebuildConfig
    *          The configuration to sue when performing the rebuild.
-   * @throws Exception
-   *           If a problem occurs while performing the rebuild.
+   * @throws InitializationException
+   *           If a problem occurs during initialization
+   * @throws ConfigException
+   *           If the configuration is invalid
+   * @throws InterruptedException
+   *           If the rebuild process has been interrupted
+   * @throws ExecutionException
+   *           If a problem occurs while performing the rebuild
    * @see {@link Backend#rebuildIndex(RebuildConfig, ServerContext)}
    */
-  void rebuildIndex(RebuildConfig rebuildConfig) throws Exception;
+  void rebuildIndex(RebuildConfig rebuildConfig)
+      throws InitializationException, ConfigException, InterruptedException, ExecutionException;
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index e46e160..b38f931 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -36,6 +36,7 @@
 import static org.opends.messages.BackendMessages.*;
 import static org.opends.server.util.DynamicConstants.*;
 import static org.opends.server.util.StaticUtils.*;
+import static org.forgerock.opendj.ldap.ResultCode.*;
 
 import java.io.Closeable;
 import java.io.File;
@@ -58,7 +59,7 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -98,14 +99,13 @@
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.DN;
-import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.spi.Indexer;
-import org.forgerock.util.Reject;
-import org.forgerock.util.Utils;
-import org.forgerock.util.promise.PromiseImpl;
 import org.forgerock.opendj.server.config.meta.BackendIndexCfgDefn.IndexType;
 import org.forgerock.opendj.server.config.server.BackendIndexCfg;
 import org.forgerock.opendj.server.config.server.PluggableBackendCfg;
+import org.forgerock.util.Reject;
+import org.forgerock.util.Utils;
+import org.forgerock.util.promise.PromiseImpl;
 import org.opends.server.api.CompressedSchema;
 import org.opends.server.backends.RebuildConfig;
 import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
@@ -180,7 +180,8 @@
     }
 
     @Override
-    public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
+    public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
+        throws InitializationException, ConfigException, InterruptedException, ExecutionException
     {
       logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
 
@@ -196,45 +197,49 @@
         final OnDiskMergeImporter importer;
         final ExecutorService sorter =
             Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
-        final LDIFReaderSource source =
-            new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
-        final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
-        try (final Importer dbStorage = rootContainer.getStorage().startImport())
+        try (final LDIFReaderSource source =
+            new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount))
         {
-          final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
-          final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
-              ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
-              : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
-
-          importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
-          importer.doImport(source);
-        }
-        finally
-        {
-          sorter.shutdownNow();
-          if (OperatingSystem.isWindows())
+          final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
+          try (final Importer dbStorage = rootContainer.getStorage().startImport())
           {
-            // Try to force the JVM to close mmap()ed file so that they can be deleted.
-            // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
-            System.gc();
-            Runtime.getRuntime().runFinalization();
+            final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
+            final AbstractTwoPhaseImportStrategy importStrategy =
+                new ExternalSortAndImportStrategy(entryContainers, dbStorage, tempDir, bufferPool, sorter);
+            importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
+            importer.doImport(source);
           }
-          recursiveDelete(tempDir);
-        }
-        logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
-            / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
+          finally
+          {
+            sorter.shutdownNow();
+            if (OperatingSystem.isWindows())
+            {
+              // Try to force the JVM to close mmap()ed file so that they can be deleted.
+              // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+              System.gc();
+              Runtime.getRuntime().runFinalization();
+            }
+            recursiveDelete(tempDir);
+          }
+          logger.info(NOTE_IMPORT_PHASE_STATS,
+                      importer.getTotalTimeInMillis() / 1000,
+                      importer.getPhaseOneTimeInMillis() / 1000,
+                      importer.getPhaseTwoTimeInMillis() / 1000);
 
-        final long importTime = System.currentTimeMillis() - startTime;
-        float rate = 0;
-        if (importTime > 0)
-        {
-          rate = 1000f * source.getEntriesRead() / importTime;
+          final long importTime = System.currentTimeMillis() - startTime;
+          float rate = 0;
+          if (importTime > 0)
+          {
+            rate = 1000f * source.getEntriesRead() / importTime;
+          }
+          logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
+              .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
+          return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source.getEntriesIgnored());
         }
-        logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
-            .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
-
-        return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
-            .getEntriesIgnored());
+      }
+      catch (IOException e)
+      {
+        throw new ExecutionException(e);
       }
     }
 
@@ -265,17 +270,26 @@
     }
 
     @Override
-    public void rebuildIndex(final RebuildConfig rebuildConfig) throws Exception
+    public void rebuildIndex(final RebuildConfig rebuildConfig)
+        throws InitializationException, ExecutionException, ConfigException, InterruptedException
     {
       final EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
-      final long totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>()
+      final long totalEntries;
+      try
       {
-        @Override
-        public Long run(ReadableTransaction txn) throws Exception
+        totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>()
         {
-          return entryContainer.getID2Entry().getRecordCount(txn);
-        }
-      });
+          @Override
+          public Long run(ReadableTransaction txn) throws Exception
+          {
+            return entryContainer.getID2Entry().getRecordCount(txn);
+          }
+        });
+      }
+      catch (Exception e)
+      {
+        throw new ExecutionException(e);
+      }
 
       final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries);
       if (rebuildConfig.isClearDegradedState())
@@ -289,20 +303,28 @@
       }
     }
 
-    private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes) throws Exception
+    private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes)
+        throws ExecutionException
     {
-      rootContainer.getStorage().write(new WriteOperation()
+      try
       {
-        @Override
-        public void run(WriteableTransaction txn) throws Exception
+        rootContainer.getStorage().write(new WriteOperation()
         {
-          visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn)));
-        }
-      });
+          @Override
+          public void run(WriteableTransaction txn)
+          {
+            visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn)));
+          }
+        });
+      }
+      catch (Exception e)
+      {
+        throw new ExecutionException(e);
+      }
     }
 
     private void rebuildIndex(EntryContainer entryContainer, String tmpDirectory, Set<String> indexesToRebuild,
-        long totalEntries) throws Exception
+        long totalEntries) throws InitializationException, ConfigException, InterruptedException, ExecutionException
     {
       if (indexesToRebuild.isEmpty())
       {
@@ -593,7 +615,7 @@
   }
 
   /** Source of LDAP {@link Entry}s to process. */
-  private interface Source
+  private interface Source extends Closeable
   {
     /** Process {@link Entry}s extracted from a {@link Source}. */
     interface EntryProcessor
@@ -601,7 +623,7 @@
       void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws Exception;
     }
 
-    void processAllEntries(EntryProcessor processor) throws Exception;
+    void processAllEntries(EntryProcessor processor) throws InterruptedException, ExecutionException;
 
     boolean isCancelled();
   }
@@ -632,7 +654,13 @@
     }
 
     @Override
-    public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
+    public void close()
+    {
+      closeSilently(reader);
+    }
+
+    @Override
+    public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException
     {
       final ScheduledExecutorService scheduler =
           Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
@@ -792,12 +820,17 @@
     }
 
     @Override
-    public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
+    public void close() {
+      executor.shutdown();
+    }
+
+    @Override
+    public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException
     {
       final ScheduledExecutorService scheduler =
           Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
       scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS);
-      final PromiseImpl<Void, Exception> promise = PromiseImpl.create();
+      final PromiseImpl<Void, ExecutionException> promise = PromiseImpl.create();
       final ID2Entry id2Entry = entryContainer.getID2Entry();
       try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(id2Entry.getName()))
       {
@@ -816,11 +849,16 @@
                     new EntryID(key), id2Entry.entryFromDatabase(value, schema));
                 nbEntriesProcessed.incrementAndGet();
               }
-              catch (Exception e)
+              catch (ExecutionException e)
               {
                 interrupted = true;
                 promise.handleException(e);
               }
+              catch (Exception e)
+              {
+                interrupted = true;
+                promise.handleException(new ExecutionException(e));
+              }
             }
           });
         }
@@ -835,7 +873,7 @@
       // Forward exception if any
       if (promise.isDone())
       {
-        promise.getOrThrow(0, TimeUnit.SECONDS);
+        promise.getOrThrow();
       }
     }
 
@@ -904,7 +942,7 @@
     this.importStrategy = importStrategy;
   }
 
-  private void doImport(final Source source) throws Exception
+  private void doImport(final Source source) throws InterruptedException, ExecutionException
   {
     final long phaseOneStartTime = System.currentTimeMillis();
     final PhaseOneWriteableTransaction transaction = new PhaseOneWriteableTransaction(importStrategy);
@@ -913,36 +951,42 @@
     final ConcurrentMap<EntryContainer, CountDownLatch> importedContainers = new ConcurrentHashMap<>();
 
     // Start phase one
-    source.processAllEntries(new Source.EntryProcessor()
+    try
     {
-      @Override
-      public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
-          InterruptedException
+      source.processAllEntries(new Source.EntryProcessor()
       {
-        CountDownLatch latch = importedContainers.get(container);
-        if (latch == null)
+        @Override
+        public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
+            InterruptedException
         {
-          final CountDownLatch newLatch = new CountDownLatch(1);
-          if (importedContainers.putIfAbsent(container, newLatch) == null)
+          CountDownLatch latch = importedContainers.get(container);
+          if (latch == null)
           {
-            try
+            final CountDownLatch newLatch = new CountDownLatch(1);
+            if (importedContainers.putIfAbsent(container, newLatch) == null)
             {
-              importStrategy.beforePhaseOne(container);
+              try
+              {
+                importStrategy.beforePhaseOne(container);
+              }
+              finally
+              {
+                newLatch.countDown();
+              }
             }
-            finally
-            {
-              newLatch.countDown();
-            }
+            latch = importedContainers.get(container);
           }
-          latch = importedContainers.get(container);
-        }
-        latch.await();
+          latch.await();
 
-        importStrategy.validate(container, entryID, entry);
-        container.importEntry(transaction, entryID, entry);
-        importedCount.incrementAndGet();
-      }
-    });
+          container.importEntry(transaction, entryID, entry);
+          importedCount.incrementAndGet();
+        }
+      });
+    }
+    finally
+    {
+      closeSilently(source);
+    }
     phaseOneTimeMs = System.currentTimeMillis() - phaseOneStartTime;
 
     if (source.isCancelled())
@@ -1023,8 +1067,6 @@
       this.sorter = sorter;
     }
 
-    abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
-
     void beforePhaseOne(EntryContainer entryContainer)
     {
       entryContainer.delete(asWriteableTransaction(importer));
@@ -1057,13 +1099,14 @@
     }
 
     final Callable<Void> newDN2IDImporterTask(TreeName treeName, final Chunk source,
-        PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported)
+        PhaseTwoProgressReporter progressReporter)
     {
       final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
+      final ID2Entry id2entry = entryContainer.getID2Entry();
       final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
 
-      return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source,
-          id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
+      return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, id2entry, entryContainer.getDN2ID(),
+          source, id2count, newPhaseTwoCollector(entryContainer, id2count.getName()));
     }
 
     final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source,
@@ -1091,25 +1134,20 @@
   }
 
   /**
-   * No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated
-   * {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into
-   * the database through {@link ImporterToChunkAdapter}.
+   * During phase one, import all {@link TreeName} (but id2entry) into a dedicated and temporary
+   * {@link ExternalSortChunk} which will sort the keys in the ascending order. Phase two will copy the sorted keys into
+   * the database using the {@link Importer}. id2entry database is imported directly into the database using
+   * {@link ImporterToChunkAdapter}.
    */
-  private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy
+  private static final class ExternalSortAndImportStrategy extends AbstractTwoPhaseImportStrategy
   {
-    SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
+    ExternalSortAndImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
         BufferPool bufferPool, Executor sorter)
     {
       super(entryContainers, importer, tempDir, bufferPool, sorter);
     }
 
     @Override
-    public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry)
-    {
-      // No validation performed. All entries are considered valid.
-    }
-
-    @Override
     public Chunk newChunk(TreeName treeName) throws Exception
     {
       if (isID2Entry(treeName))
@@ -1131,7 +1169,7 @@
       }
       else if (isDN2ID(treeName))
       {
-        return newDN2IDImporterTask(treeName, source, progressReporter, false);
+        return newDN2IDImporterTask(treeName, source, progressReporter);
       }
       else if (isVLVIndex(entryContainer, treeName))
       {
@@ -1141,96 +1179,6 @@
     }
   }
 
-  /**
-   * This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that
-   * the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of
-   * id2entry. Others tree are externally sorted before being imported into the database.
-   */
-  private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements
-      ReadableTransaction
-  {
-    private static final int DN_CACHE_SIZE = 16;
-    private final LRUPresenceCache<DN> dnCache = new LRUPresenceCache<>(DN_CACHE_SIZE);
-
-    SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
-        BufferPool bufferPool, Executor sorter)
-    {
-      super(entryContainers, importer, tempDir, bufferPool, sorter);
-    }
-
-    @Override
-    public Chunk newChunk(TreeName treeName) throws Exception
-    {
-      if (isID2Entry(treeName))
-      {
-        return new MostlyOrderedChunk(asChunk(treeName, importer));
-      }
-      else if (isDN2ID(treeName))
-      {
-        return asChunk(treeName, importer);
-      }
-      return newExternalSortChunk(treeName);
-    }
-
-    @Override
-    public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk source,
-        PhaseTwoProgressReporter progressReporter)
-    {
-      final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
-
-      if (isID2Entry(treeName))
-      {
-        return newFlushTask(source);
-      }
-      else if (isDN2ID(treeName))
-      {
-        return newDN2IDImporterTask(treeName, source, progressReporter, true);
-      }
-      else if (isVLVIndex(entryContainer, treeName))
-      {
-        return newVLVIndexImporterTask(getVLVIndex(entryContainer, treeName), source, progressReporter);
-      }
-      return newChunkCopierTask(treeName, source, progressReporter);
-    }
-
-    @Override
-    public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
-    {
-      final DN2ID dn2Id = entryContainer.getDN2ID();
-      final DN entryDN = entry.getName();
-      final DN parentDN = entryContainer.getParentWithinBase(entryDN);
-
-      if (parentDN != null && !dnCache.contains(parentDN) && dn2Id.get(this, parentDN) == null)
-      {
-        throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
-      }
-
-      if (dn2Id.get(this, entryDN) != null)
-      {
-        throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry));
-      }
-      dnCache.add(entryDN);
-    }
-
-    @Override
-    public ByteString read(TreeName treeName, ByteSequence key)
-    {
-      return importer.read(treeName, key);
-    }
-
-    @Override
-    public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getRecordCount(TreeName treeName)
-    {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   /** Import only a specific indexes list while ignoring everything else. */
   private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy
   {
@@ -1241,7 +1189,7 @@
     {
       super(entryContainers, importer, tempDir, bufferPool, sorter);
       this.indexesToRebuild = new HashSet<>(indexNames.size());
-      for(String indexName : indexNames)
+      for (String indexName : indexNames)
       {
         this.indexesToRebuild.add(indexName.toLowerCase());
       }
@@ -1280,7 +1228,7 @@
       {
         if (isDN2ID(treeName))
         {
-          return newDN2IDImporterTask(treeName, source, progressReporter, false);
+          return newDN2IDImporterTask(treeName, source, progressReporter);
         }
         else if (isVLVIndex(entryContainer, treeName))
         {
@@ -1291,12 +1239,6 @@
       // Do nothing (flush null chunk)
       return newFlushTask(source);
     }
-
-    @Override
-    public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
-    {
-      // No validation performed. All entries are considered valid.
-    }
   }
 
   private static <V> List<V> invokeParallel(String threadNameTemplate, Collection<Callable<V>> tasks)
@@ -1566,7 +1508,7 @@
         throw new StorageRuntimeException(e);
       }
 
-      return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
+      final CompositeCursor<ByteString, ByteString> cursor = new CompositeCursor<ByteString, ByteString>(name, cursors)
       {
         @Override
         public void close()
@@ -1587,7 +1529,10 @@
           }
           closeSilently(channel);
         }
-      }, (Collector<?, ByteString>) phaseTwoDeduplicator);
+      };
+      return phaseTwoDeduplicator != null
+          ? new CollectorCursor<>(cursor, (Collector<?, ByteString>) phaseTwoDeduplicator)
+          : cursor;
     }
 
     @Override
@@ -1625,8 +1570,9 @@
           checkThreadNotInterrupted();
           final int regionSize;
           try (final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
-               final SequentialCursor<ByteString, ByteString> source =
-                new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
+               final SequentialCursor<ByteString, ByteString> source = phaseOneDeduplicator != null
+                     ? new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator)
+                     : chunk.flip())
           {
             regionSize = region.write(source);
           }
@@ -2352,6 +2298,7 @@
     private final Importer importer;
     private final File tempDir;
     private final BufferPool bufferPool;
+    private final ID2Entry id2entry;
     private final DN2ID dn2id;
     private final ID2ChildrenCount id2count;
     private final Collector<?, ByteString> id2countCollector;
@@ -2359,18 +2306,19 @@
     private final Chunk dn2IdDestination;
 
     DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool,
-        DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, Collector<?, ByteString> id2countCollector,
-        boolean dn2idAlreadyImported)
+        ID2Entry id2Entry, DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count,
+        Collector<?, ByteString> id2countCollector)
     {
       this.reporter = progressReporter;
       this.importer = importer;
       this.tempDir = tempDir;
       this.bufferPool = bufferPool;
+      this.id2entry = id2Entry;
       this.dn2id = dn2id;
       this.dn2IdSourceChunk = dn2IdChunk;
       this.id2count = id2count;
       this.id2countCollector = id2countCollector;
-      this.dn2IdDestination = dn2idAlreadyImported ? nullChunk() : asChunk(dn2id.getName(), importer);
+      this.dn2IdDestination = asChunk(dn2id.getName(), importer);
     }
 
     @Override
@@ -2381,17 +2329,20 @@
               id2countCollector, sameThreadExecutor());
       long totalNumberOfEntries = 0;
 
-      final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
-      try (final MeteredCursor<ByteString, ByteString> chunkCursor = dn2IdSourceChunk.flip();
-          final SequentialCursor<ByteString, ByteString> dn2idCursor =
-              dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
+      final TreeVisitor<ChildrenCount> childrenCountVisitor =
+          new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
+      try (final SequentialCursor<ByteString, ByteString> chunkCursor =
+               trackCursorProgress(reporter, dn2IdSourceChunk.flip());
+           final DnValidationCursorDecorator validatorCursor =
+               new DnValidationCursorDecorator(chunkCursor, id2entry, asWriteableTransaction(importer));
+           final SequentialCursor<ByteString, ByteString> dn2idCursor =
+               dn2id.openCursor(validatorCursor, childrenCountVisitor))
       {
-        checkThreadNotInterrupted();
         while (dn2idCursor.next())
         {
+          checkThreadNotInterrupted();
           dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
           totalNumberOfEntries++;
-          checkThreadNotInterrupted();
         }
       }
       id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2445,6 +2396,95 @@
     }
   }
 
+  /**
+   * Throw a {@link StorageRuntimeException} when a duplicate or orphan DNs is detected. DNs returned by the decorated
+   * cursor must be sorted.
+   */
+  static final class DnValidationCursorDecorator extends
+      SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString>
+  {
+    private final LinkedList<ByteString> parentDns = new LinkedList<>();
+    private final ID2Entry id2entry;
+    private final ReadableTransaction txn;
+
+    DnValidationCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, ID2Entry id2entry,
+        ReadableTransaction txn)
+    {
+      super(delegate);
+      this.id2entry = id2entry;
+      this.txn = txn;
+    }
+
+    @Override
+    public boolean next()
+    {
+      if (!delegate.next())
+      {
+        return false;
+      }
+      final ByteString dn = delegate.getKey();
+      try
+      {
+        throwIfDuplicate(dn);
+        throwIfOrphan(dn);
+      }
+      catch (DirectoryException e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+      parentDns.add(dn);
+      return true;
+    }
+
+    private void throwIfDuplicate(ByteString dn) throws DirectoryException
+    {
+      if (dn.equals(parentDns.peekLast()))
+      {
+        throw new DirectoryException(ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(getDnAsString()));
+      }
+    }
+
+    private String getDnAsString() {
+      try
+      {
+        return id2entry.get(txn, new EntryID(delegate.getValue())).getName().toString();
+      }
+      catch (Exception e)
+      {
+        return DnKeyFormat.keyToDNString(delegate.getKey());
+      }
+    }
+
+    private void throwIfOrphan(ByteString dn) throws DirectoryException
+    {
+      if (!parentExists(dn)) {
+        throw new DirectoryException(NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(getDnAsString()));
+      }
+    }
+
+    private boolean parentExists(ByteString childDn)
+    {
+      final Iterator<ByteString> it = parentDns.descendingIterator();
+      int i = parentDns.size();
+      while (it.hasNext())
+      {
+        if (DnKeyFormat.isChild(it.next(), childDn))
+        {
+          if (i < parentDns.size())
+          {
+            // Reset the last element in the stack to be the parentDn:
+            // (removes siblings, nephews, grand-nephews, etc. of childDn)
+            parentDns.subList(i, parentDns.size()).clear();
+          }
+          return true;
+        }
+        i--;
+      }
+      // First DN must represent the base-dn which is encoded as an empty ByteString.
+      return parentDns.isEmpty() && childDn.isEmpty();
+    }
+  }
+
   private static Importer asImporter(Chunk chunk)
   {
     return new ChunkToImporterAdapter(chunk);
@@ -3037,7 +3077,12 @@
       // key conflicts == sum values
       return ID2ChildrenCount.getSumLongCollectorInstance();
     }
-    else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
+    else if (isDN2ID(treeName))
+    {
+      // Detection of duplicate DN will be performed during phase 2 by the DNImporterTask
+      return null;
+    }
+    else if (isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
     {
       // key conflicts == exception
       return UniqueValueCollector.getInstance();
@@ -3681,41 +3726,6 @@
     }
   }
 
-  /**
-   * Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here
-   * to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least
-   * recently used property of the cache would not be respected.
-   */
-  private static final class LRUPresenceCache<T>
-  {
-    private final Map<T, Object> cache;
-
-    LRUPresenceCache(final int maxEntries)
-    {
-      // +1 because newly added entry is added before the least recently one is removed.
-      this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
-      {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
-        {
-          return size() >= maxEntries;
-        }
-      });
-    }
-
-    public boolean contains(T object)
-    {
-      return cache.get(object) != null;
-    }
-
-    public void add(T object)
-    {
-      cache.put(object, null);
-    }
-  }
-
   private static WriteableTransaction asWriteableTransaction(Importer importer)
   {
     return new ImporterToWriteableTransactionAdapter(importer);
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index 65b6101..947b3f4 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -17,6 +17,10 @@
 
 import static org.assertj.core.api.Assertions.*;
 import static org.opends.server.backends.pluggable.EntryIDSet.*;
+import static org.forgerock.util.Pair.of;
+import static org.mockito.Mockito.*;
+import static org.opends.server.backends.pluggable.DnKeyFormat.dnToDNKey;
+import static org.forgerock.opendj.ldap.ResultCode.*;
 
 import java.io.File;
 import java.nio.ByteBuffer;
@@ -38,6 +42,8 @@
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.util.Pair;
 import org.mockito.Mockito;
 import org.opends.server.DirectoryServerTestCase;
@@ -46,6 +52,7 @@
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.DnValidationCursorDecorator;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.EntryIDSetsCollector;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor;
@@ -60,6 +67,7 @@
 import org.opends.server.backends.pluggable.spi.TreeName;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.crypto.CryptoSuite;
+import org.opends.server.types.DirectoryException;
 import org.testng.annotations.Test;
 
 import com.forgerock.opendj.util.PackedLong;
@@ -78,6 +86,100 @@
     testBufferImplementation(new MemoryBuffer(ByteBuffer.allocateDirect(1024)));
   }
 
+  @Test
+  public void testDnValidationThrowsOnMissingBaseDn() throws DirectoryException
+  {
+    // Given
+    final SequentialCursor<ByteString, ByteString> source =
+        cursorOf(
+                       of(dnKey("ou=people"), entryId(1)),
+            of(dnKey("uid=user.0,ou=people"), entryId(2)));
+
+    // When
+    final Exception exception = validateDNs(source);
+
+    // Then
+    assertDirectoryExceptionThrown(exception, NO_SUCH_OBJECT);
+    assertThat(exception.getMessage()).contains("ou=people")
+                                      .doesNotContain("user.0");
+  }
+
+  private ByteString dnKey(String dn)
+  {
+    return dnToDNKey(DN.valueOf(dn), 0);
+  }
+
+  private static ByteString entryId(long id) {
+    return new EntryID(id).toByteString();
+  }
+
+  /**
+   * Cursor completely the source using the {@link DnValidationCursorDecorator} which will throw a
+   * {@link StorageRuntimeException} when detecting an invalid DN.
+   */
+  private Exception validateDNs(SequentialCursor<ByteString, ByteString> source)
+      throws DirectoryException, StorageRuntimeException
+  {
+    final ID2Entry id2entry = mock(ID2Entry.class);
+    when(id2entry.get((ReadableTransaction) any(), (EntryID) any())).thenThrow(new StorageRuntimeException("unused"));
+
+    // When
+    try
+    {
+      toPairs(new DnValidationCursorDecorator(source, id2entry, mock(ReadableTransaction.class)));
+      fail("Exception expected");
+      return null;
+    }
+    catch (Exception e)
+    {
+      return e;
+    }
+  }
+
+  private void assertDirectoryExceptionThrown(final Exception exception, ResultCode expectedResultCode)
+  {
+    assertThat(exception).isExactlyInstanceOf(StorageRuntimeException.class);
+    assertThat(exception.getCause()).isExactlyInstanceOf(DirectoryException.class);
+    assertThat(((DirectoryException) exception.getCause()).getResultCode()).isEqualTo(expectedResultCode);
+  }
+
+  @Test
+  public void testDnValidationThrowsOnOrphans() throws DirectoryException, StorageRuntimeException {
+    // Given
+    final SequentialCursor<ByteString, ByteString> source =
+        cursorOf(
+                             of(dnKey(""), entryId(1)),
+                    of(dnKey("ou=people"), entryId(2)),
+         of(dnKey("uid=user.0,ou=people"), entryId(3)),
+           of(dnKey("uid=doh,ou=people1"), entryId(4)));
+
+    // When
+    final Exception exception = validateDNs(source);
+
+    // Then
+    assertDirectoryExceptionThrown(exception, NO_SUCH_OBJECT);
+    assertThat(exception.getMessage()).contains("uid=doh");
+  }
+
+  @Test
+  public void testDnValidationThrowsOnDuplicates() throws DirectoryException, StorageRuntimeException {
+    // Given
+    final SequentialCursor<ByteString, ByteString> source =
+        cursorOf(
+                           of(dnKey(""), entryId(1)),
+                  of(dnKey("ou=people"), entryId(2)),
+       of(dnKey("uid=user.0,ou=people"), entryId(3)),
+       of(dnKey("uid=user.2,ou=people"), entryId(4)),
+       of(dnKey("uid=user.2,ou=people"), entryId(5)));
+
+    // When
+    final Exception exception = validateDNs(source);
+
+    // Then
+    assertDirectoryExceptionThrown(exception, ENTRY_ALREADY_EXISTS);
+    assertThat(exception.getMessage()).contains("uid=user.2");
+  }
+
   private static void testBufferImplementation(MemoryBuffer buffer)
   {
     final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4, 1 });
@@ -143,7 +245,6 @@
   }
 
   @Test
-  @SuppressWarnings(value = "unchecked")
   public void testCounterCollector()
   {
     final MeteredCursor<String, ByteString> source = cursorOf(
@@ -165,7 +266,6 @@
   }
 
   @Test
-  @SuppressWarnings(value = "unchecked")
   public void testEntryIDSetCollector()
   {
     final MeteredCursor<String, ByteString> source = cursorOf(
@@ -279,8 +379,8 @@
     long offset = 0;
     for (Chunk source : memoryChunks)
     {
-      final FileRegion region = new FileRegion(channel, offset, source.size());
-      try(final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
+      try(final FileRegion region = new FileRegion(channel, offset, source.size());
+          final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
         regions.add(Pair.of(offset, region.write(cursor)));
       }
       offset += source.size();
@@ -356,7 +456,8 @@
     return collection;
   }
 
-  private final static <K, V> MeteredCursor<K, V> cursorOf(@SuppressWarnings("unchecked") Pair<K, V>... pairs)
+  @SafeVarargs
+  private final static <K, V> MeteredCursor<K, V> cursorOf(Pair<K, V>... pairs)
   {
     return cursorOf(Arrays.asList(pairs));
   }

--
Gitblit v1.10.0