mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
25.05.2016 5111f7cbf41682bebb67bd0876818b9432961101
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
@@ -11,11 +11,15 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2015 ForgeRock AS.
 * Copyright 2015-2016 ForgeRock AS.
 */
package org.opends.server.backends.pluggable;
import java.util.concurrent.ExecutionException;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.backends.RebuildConfig;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
@@ -28,20 +32,34 @@
   * @param importConfig
   *          The configuration to use when performing the import
   * @return Information about the result of the import processing
   * @throws Exception
   * @throws InitializationException
   *           If a problem occurs during initialization
   * @throws ConfigException
   *           If the configuration is invalid
   * @throws InterruptedException
   *           If the import process has been interrupted
   * @throws ExecutionException
   *           If a problem occurs while performing the LDIF import
   * @see {@link Backend#importLDIF(LDIFImportConfig, ServerContext)}
   */
  LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception;
  LDIFImportResult importLDIF(LDIFImportConfig importConfig)
      throws InitializationException, ConfigException, InterruptedException, ExecutionException;
  /**
   * Rebuild indexes.
   *
   * @param rebuildConfig
   *          The configuration to sue when performing the rebuild.
   * @throws Exception
   *           If a problem occurs while performing the rebuild.
   * @throws InitializationException
   *           If a problem occurs during initialization
   * @throws ConfigException
   *           If the configuration is invalid
   * @throws InterruptedException
   *           If the rebuild process has been interrupted
   * @throws ExecutionException
   *           If a problem occurs while performing the rebuild
   * @see {@link Backend#rebuildIndex(RebuildConfig, ServerContext)}
   */
  void rebuildIndex(RebuildConfig rebuildConfig) throws Exception;
  void rebuildIndex(RebuildConfig rebuildConfig)
      throws InitializationException, ConfigException, InterruptedException, ExecutionException;
}
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -36,6 +36,7 @@
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
import static org.forgerock.opendj.ldap.ResultCode.*;
import java.io.Closeable;
import java.io.File;
@@ -58,7 +59,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -98,14 +99,13 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.spi.Indexer;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.promise.PromiseImpl;
import org.forgerock.opendj.server.config.meta.BackendIndexCfgDefn.IndexType;
import org.forgerock.opendj.server.config.server.BackendIndexCfg;
import org.forgerock.opendj.server.config.server.PluggableBackendCfg;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.promise.PromiseImpl;
import org.opends.server.api.CompressedSchema;
import org.opends.server.backends.RebuildConfig;
import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
@@ -180,7 +180,8 @@
    }
    @Override
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
        throws InitializationException, ConfigException, InterruptedException, ExecutionException
    {
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
@@ -196,45 +197,49 @@
        final OnDiskMergeImporter importer;
        final ExecutorService sorter =
            Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
        final LDIFReaderSource source =
            new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
        final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
        try (final Importer dbStorage = rootContainer.getStorage().startImport())
        try (final LDIFReaderSource source =
            new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount))
        {
          final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
          final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
              ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
              : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
          importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
          importer.doImport(source);
        }
        finally
        {
          sorter.shutdownNow();
          if (OperatingSystem.isWindows())
          final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
          try (final Importer dbStorage = rootContainer.getStorage().startImport())
          {
            // Try to force the JVM to close mmap()ed file so that they can be deleted.
            // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
            System.gc();
            Runtime.getRuntime().runFinalization();
            final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
            final AbstractTwoPhaseImportStrategy importStrategy =
                new ExternalSortAndImportStrategy(entryContainers, dbStorage, tempDir, bufferPool, sorter);
            importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
            importer.doImport(source);
          }
          recursiveDelete(tempDir);
        }
        logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
            / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
          finally
          {
            sorter.shutdownNow();
            if (OperatingSystem.isWindows())
            {
              // Try to force the JVM to close mmap()ed file so that they can be deleted.
              // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
              System.gc();
              Runtime.getRuntime().runFinalization();
            }
            recursiveDelete(tempDir);
          }
          logger.info(NOTE_IMPORT_PHASE_STATS,
                      importer.getTotalTimeInMillis() / 1000,
                      importer.getPhaseOneTimeInMillis() / 1000,
                      importer.getPhaseTwoTimeInMillis() / 1000);
        final long importTime = System.currentTimeMillis() - startTime;
        float rate = 0;
        if (importTime > 0)
        {
          rate = 1000f * source.getEntriesRead() / importTime;
          final long importTime = System.currentTimeMillis() - startTime;
          float rate = 0;
          if (importTime > 0)
          {
            rate = 1000f * source.getEntriesRead() / importTime;
          }
          logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
              .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
          return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source.getEntriesIgnored());
        }
        logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
            .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
        return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
            .getEntriesIgnored());
      }
      catch (IOException e)
      {
        throw new ExecutionException(e);
      }
    }
@@ -265,17 +270,26 @@
    }
    @Override
    public void rebuildIndex(final RebuildConfig rebuildConfig) throws Exception
    public void rebuildIndex(final RebuildConfig rebuildConfig)
        throws InitializationException, ExecutionException, ConfigException, InterruptedException
    {
      final EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
      final long totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>()
      final long totalEntries;
      try
      {
        @Override
        public Long run(ReadableTransaction txn) throws Exception
        totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>()
        {
          return entryContainer.getID2Entry().getRecordCount(txn);
        }
      });
          @Override
          public Long run(ReadableTransaction txn) throws Exception
          {
            return entryContainer.getID2Entry().getRecordCount(txn);
          }
        });
      }
      catch (Exception e)
      {
        throw new ExecutionException(e);
      }
      final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries);
      if (rebuildConfig.isClearDegradedState())
@@ -289,20 +303,28 @@
      }
    }
    private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes) throws Exception
    private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes)
        throws ExecutionException
    {
      rootContainer.getStorage().write(new WriteOperation()
      try
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
        rootContainer.getStorage().write(new WriteOperation()
        {
          visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn)));
        }
      });
          @Override
          public void run(WriteableTransaction txn)
          {
            visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn)));
          }
        });
      }
      catch (Exception e)
      {
        throw new ExecutionException(e);
      }
    }
    private void rebuildIndex(EntryContainer entryContainer, String tmpDirectory, Set<String> indexesToRebuild,
        long totalEntries) throws Exception
        long totalEntries) throws InitializationException, ConfigException, InterruptedException, ExecutionException
    {
      if (indexesToRebuild.isEmpty())
      {
@@ -593,7 +615,7 @@
  }
  /** Source of LDAP {@link Entry}s to process. */
  private interface Source
  private interface Source extends Closeable
  {
    /** Process {@link Entry}s extracted from a {@link Source}. */
    interface EntryProcessor
@@ -601,7 +623,7 @@
      void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws Exception;
    }
    void processAllEntries(EntryProcessor processor) throws Exception;
    void processAllEntries(EntryProcessor processor) throws InterruptedException, ExecutionException;
    boolean isCancelled();
  }
@@ -632,7 +654,13 @@
    }
    @Override
    public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
    public void close()
    {
      closeSilently(reader);
    }
    @Override
    public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException
    {
      final ScheduledExecutorService scheduler =
          Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
@@ -792,12 +820,17 @@
    }
    @Override
    public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
    public void close() {
      executor.shutdown();
    }
    @Override
    public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException
    {
      final ScheduledExecutorService scheduler =
          Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
      scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS);
      final PromiseImpl<Void, Exception> promise = PromiseImpl.create();
      final PromiseImpl<Void, ExecutionException> promise = PromiseImpl.create();
      final ID2Entry id2Entry = entryContainer.getID2Entry();
      try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(id2Entry.getName()))
      {
@@ -816,11 +849,16 @@
                    new EntryID(key), id2Entry.entryFromDatabase(value, schema));
                nbEntriesProcessed.incrementAndGet();
              }
              catch (Exception e)
              catch (ExecutionException e)
              {
                interrupted = true;
                promise.handleException(e);
              }
              catch (Exception e)
              {
                interrupted = true;
                promise.handleException(new ExecutionException(e));
              }
            }
          });
        }
@@ -835,7 +873,7 @@
      // Forward exception if any
      if (promise.isDone())
      {
        promise.getOrThrow(0, TimeUnit.SECONDS);
        promise.getOrThrow();
      }
    }
@@ -904,7 +942,7 @@
    this.importStrategy = importStrategy;
  }
  private void doImport(final Source source) throws Exception
  private void doImport(final Source source) throws InterruptedException, ExecutionException
  {
    final long phaseOneStartTime = System.currentTimeMillis();
    final PhaseOneWriteableTransaction transaction = new PhaseOneWriteableTransaction(importStrategy);
@@ -913,36 +951,42 @@
    final ConcurrentMap<EntryContainer, CountDownLatch> importedContainers = new ConcurrentHashMap<>();
    // Start phase one
    source.processAllEntries(new Source.EntryProcessor()
    try
    {
      @Override
      public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
          InterruptedException
      source.processAllEntries(new Source.EntryProcessor()
      {
        CountDownLatch latch = importedContainers.get(container);
        if (latch == null)
        @Override
        public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
            InterruptedException
        {
          final CountDownLatch newLatch = new CountDownLatch(1);
          if (importedContainers.putIfAbsent(container, newLatch) == null)
          CountDownLatch latch = importedContainers.get(container);
          if (latch == null)
          {
            try
            final CountDownLatch newLatch = new CountDownLatch(1);
            if (importedContainers.putIfAbsent(container, newLatch) == null)
            {
              importStrategy.beforePhaseOne(container);
              try
              {
                importStrategy.beforePhaseOne(container);
              }
              finally
              {
                newLatch.countDown();
              }
            }
            finally
            {
              newLatch.countDown();
            }
            latch = importedContainers.get(container);
          }
          latch = importedContainers.get(container);
        }
        latch.await();
          latch.await();
        importStrategy.validate(container, entryID, entry);
        container.importEntry(transaction, entryID, entry);
        importedCount.incrementAndGet();
      }
    });
          container.importEntry(transaction, entryID, entry);
          importedCount.incrementAndGet();
        }
      });
    }
    finally
    {
      closeSilently(source);
    }
    phaseOneTimeMs = System.currentTimeMillis() - phaseOneStartTime;
    if (source.isCancelled())
@@ -1023,8 +1067,6 @@
      this.sorter = sorter;
    }
    abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
    void beforePhaseOne(EntryContainer entryContainer)
    {
      entryContainer.delete(asWriteableTransaction(importer));
@@ -1057,13 +1099,14 @@
    }
    final Callable<Void> newDN2IDImporterTask(TreeName treeName, final Chunk source,
        PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported)
        PhaseTwoProgressReporter progressReporter)
    {
      final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
      final ID2Entry id2entry = entryContainer.getID2Entry();
      final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
      return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source,
          id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
      return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, id2entry, entryContainer.getDN2ID(),
          source, id2count, newPhaseTwoCollector(entryContainer, id2count.getName()));
    }
    final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source,
@@ -1091,25 +1134,20 @@
  }
  /**
   * No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated
   * {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into
   * the database through {@link ImporterToChunkAdapter}.
   * During phase one, import all {@link TreeName} (but id2entry) into a dedicated and temporary
   * {@link ExternalSortChunk} which will sort the keys in the ascending order. Phase two will copy the sorted keys into
   * the database using the {@link Importer}. id2entry database is imported directly into the database using
   * {@link ImporterToChunkAdapter}.
   */
  private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy
  private static final class ExternalSortAndImportStrategy extends AbstractTwoPhaseImportStrategy
  {
    SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
    ExternalSortAndImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
        BufferPool bufferPool, Executor sorter)
    {
      super(entryContainers, importer, tempDir, bufferPool, sorter);
    }
    @Override
    public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry)
    {
      // No validation performed. All entries are considered valid.
    }
    @Override
    public Chunk newChunk(TreeName treeName) throws Exception
    {
      if (isID2Entry(treeName))
@@ -1131,7 +1169,7 @@
      }
      else if (isDN2ID(treeName))
      {
        return newDN2IDImporterTask(treeName, source, progressReporter, false);
        return newDN2IDImporterTask(treeName, source, progressReporter);
      }
      else if (isVLVIndex(entryContainer, treeName))
      {
@@ -1141,96 +1179,6 @@
    }
  }
  /**
   * This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that
   * the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of
   * id2entry. Others tree are externally sorted before being imported into the database.
   */
  private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements
      ReadableTransaction
  {
    private static final int DN_CACHE_SIZE = 16;
    private final LRUPresenceCache<DN> dnCache = new LRUPresenceCache<>(DN_CACHE_SIZE);
    SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
        BufferPool bufferPool, Executor sorter)
    {
      super(entryContainers, importer, tempDir, bufferPool, sorter);
    }
    @Override
    public Chunk newChunk(TreeName treeName) throws Exception
    {
      if (isID2Entry(treeName))
      {
        return new MostlyOrderedChunk(asChunk(treeName, importer));
      }
      else if (isDN2ID(treeName))
      {
        return asChunk(treeName, importer);
      }
      return newExternalSortChunk(treeName);
    }
    @Override
    public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk source,
        PhaseTwoProgressReporter progressReporter)
    {
      final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
      if (isID2Entry(treeName))
      {
        return newFlushTask(source);
      }
      else if (isDN2ID(treeName))
      {
        return newDN2IDImporterTask(treeName, source, progressReporter, true);
      }
      else if (isVLVIndex(entryContainer, treeName))
      {
        return newVLVIndexImporterTask(getVLVIndex(entryContainer, treeName), source, progressReporter);
      }
      return newChunkCopierTask(treeName, source, progressReporter);
    }
    @Override
    public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
    {
      final DN2ID dn2Id = entryContainer.getDN2ID();
      final DN entryDN = entry.getName();
      final DN parentDN = entryContainer.getParentWithinBase(entryDN);
      if (parentDN != null && !dnCache.contains(parentDN) && dn2Id.get(this, parentDN) == null)
      {
        throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
      }
      if (dn2Id.get(this, entryDN) != null)
      {
        throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry));
      }
      dnCache.add(entryDN);
    }
    @Override
    public ByteString read(TreeName treeName, ByteSequence key)
    {
      return importer.read(treeName, key);
    }
    @Override
    public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
    {
      throw new UnsupportedOperationException();
    }
    @Override
    public long getRecordCount(TreeName treeName)
    {
      throw new UnsupportedOperationException();
    }
  }
  /** Import only a specific indexes list while ignoring everything else. */
  private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy
  {
@@ -1241,7 +1189,7 @@
    {
      super(entryContainers, importer, tempDir, bufferPool, sorter);
      this.indexesToRebuild = new HashSet<>(indexNames.size());
      for(String indexName : indexNames)
      for (String indexName : indexNames)
      {
        this.indexesToRebuild.add(indexName.toLowerCase());
      }
@@ -1280,7 +1228,7 @@
      {
        if (isDN2ID(treeName))
        {
          return newDN2IDImporterTask(treeName, source, progressReporter, false);
          return newDN2IDImporterTask(treeName, source, progressReporter);
        }
        else if (isVLVIndex(entryContainer, treeName))
        {
@@ -1291,12 +1239,6 @@
      // Do nothing (flush null chunk)
      return newFlushTask(source);
    }
    @Override
    public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
    {
      // No validation performed. All entries are considered valid.
    }
  }
  private static <V> List<V> invokeParallel(String threadNameTemplate, Collection<Callable<V>> tasks)
@@ -1566,7 +1508,7 @@
        throw new StorageRuntimeException(e);
      }
      return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
      final CompositeCursor<ByteString, ByteString> cursor = new CompositeCursor<ByteString, ByteString>(name, cursors)
      {
        @Override
        public void close()
@@ -1587,7 +1529,10 @@
          }
          closeSilently(channel);
        }
      }, (Collector<?, ByteString>) phaseTwoDeduplicator);
      };
      return phaseTwoDeduplicator != null
          ? new CollectorCursor<>(cursor, (Collector<?, ByteString>) phaseTwoDeduplicator)
          : cursor;
    }
    @Override
@@ -1625,8 +1570,9 @@
          checkThreadNotInterrupted();
          final int regionSize;
          try (final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
               final SequentialCursor<ByteString, ByteString> source =
                new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
               final SequentialCursor<ByteString, ByteString> source = phaseOneDeduplicator != null
                     ? new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator)
                     : chunk.flip())
          {
            regionSize = region.write(source);
          }
@@ -2352,6 +2298,7 @@
    private final Importer importer;
    private final File tempDir;
    private final BufferPool bufferPool;
    private final ID2Entry id2entry;
    private final DN2ID dn2id;
    private final ID2ChildrenCount id2count;
    private final Collector<?, ByteString> id2countCollector;
@@ -2359,18 +2306,19 @@
    private final Chunk dn2IdDestination;
    DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool,
        DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, Collector<?, ByteString> id2countCollector,
        boolean dn2idAlreadyImported)
        ID2Entry id2Entry, DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count,
        Collector<?, ByteString> id2countCollector)
    {
      this.reporter = progressReporter;
      this.importer = importer;
      this.tempDir = tempDir;
      this.bufferPool = bufferPool;
      this.id2entry = id2Entry;
      this.dn2id = dn2id;
      this.dn2IdSourceChunk = dn2IdChunk;
      this.id2count = id2count;
      this.id2countCollector = id2countCollector;
      this.dn2IdDestination = dn2idAlreadyImported ? nullChunk() : asChunk(dn2id.getName(), importer);
      this.dn2IdDestination = asChunk(dn2id.getName(), importer);
    }
    @Override
@@ -2381,17 +2329,20 @@
              id2countCollector, sameThreadExecutor());
      long totalNumberOfEntries = 0;
      final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
      try (final MeteredCursor<ByteString, ByteString> chunkCursor = dn2IdSourceChunk.flip();
          final SequentialCursor<ByteString, ByteString> dn2idCursor =
              dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
      final TreeVisitor<ChildrenCount> childrenCountVisitor =
          new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
      try (final SequentialCursor<ByteString, ByteString> chunkCursor =
               trackCursorProgress(reporter, dn2IdSourceChunk.flip());
           final DnValidationCursorDecorator validatorCursor =
               new DnValidationCursorDecorator(chunkCursor, id2entry, asWriteableTransaction(importer));
           final SequentialCursor<ByteString, ByteString> dn2idCursor =
               dn2id.openCursor(validatorCursor, childrenCountVisitor))
      {
        checkThreadNotInterrupted();
        while (dn2idCursor.next())
        {
          checkThreadNotInterrupted();
          dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
          totalNumberOfEntries++;
          checkThreadNotInterrupted();
        }
      }
      id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2445,6 +2396,95 @@
    }
  }
  /**
   * Throw a {@link StorageRuntimeException} when a duplicate or orphan DNs is detected. DNs returned by the decorated
   * cursor must be sorted.
   */
  static final class DnValidationCursorDecorator extends
      SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString>
  {
    private final LinkedList<ByteString> parentDns = new LinkedList<>();
    private final ID2Entry id2entry;
    private final ReadableTransaction txn;
    DnValidationCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, ID2Entry id2entry,
        ReadableTransaction txn)
    {
      super(delegate);
      this.id2entry = id2entry;
      this.txn = txn;
    }
    @Override
    public boolean next()
    {
      if (!delegate.next())
      {
        return false;
      }
      final ByteString dn = delegate.getKey();
      try
      {
        throwIfDuplicate(dn);
        throwIfOrphan(dn);
      }
      catch (DirectoryException e)
      {
        throw new StorageRuntimeException(e);
      }
      parentDns.add(dn);
      return true;
    }
    private void throwIfDuplicate(ByteString dn) throws DirectoryException
    {
      if (dn.equals(parentDns.peekLast()))
      {
        throw new DirectoryException(ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(getDnAsString()));
      }
    }
    private String getDnAsString() {
      try
      {
        return id2entry.get(txn, new EntryID(delegate.getValue())).getName().toString();
      }
      catch (Exception e)
      {
        return DnKeyFormat.keyToDNString(delegate.getKey());
      }
    }
    private void throwIfOrphan(ByteString dn) throws DirectoryException
    {
      if (!parentExists(dn)) {
        throw new DirectoryException(NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(getDnAsString()));
      }
    }
    private boolean parentExists(ByteString childDn)
    {
      final Iterator<ByteString> it = parentDns.descendingIterator();
      int i = parentDns.size();
      while (it.hasNext())
      {
        if (DnKeyFormat.isChild(it.next(), childDn))
        {
          if (i < parentDns.size())
          {
            // Reset the last element in the stack to be the parentDn:
            // (removes siblings, nephews, grand-nephews, etc. of childDn)
            parentDns.subList(i, parentDns.size()).clear();
          }
          return true;
        }
        i--;
      }
      // First DN must represent the base-dn which is encoded as an empty ByteString.
      return parentDns.isEmpty() && childDn.isEmpty();
    }
  }
  private static Importer asImporter(Chunk chunk)
  {
    return new ChunkToImporterAdapter(chunk);
@@ -3037,7 +3077,12 @@
      // key conflicts == sum values
      return ID2ChildrenCount.getSumLongCollectorInstance();
    }
    else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
    else if (isDN2ID(treeName))
    {
      // Detection of duplicate DN will be performed during phase 2 by the DNImporterTask
      return null;
    }
    else if (isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
    {
      // key conflicts == exception
      return UniqueValueCollector.getInstance();
@@ -3681,41 +3726,6 @@
    }
  }
  /**
   * Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here
   * to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least
   * recently used property of the cache would not be respected.
   */
  private static final class LRUPresenceCache<T>
  {
    private final Map<T, Object> cache;
    LRUPresenceCache(final int maxEntries)
    {
      // +1 because newly added entry is added before the least recently one is removed.
      this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
      {
        private static final long serialVersionUID = 1L;
        @Override
        protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
        {
          return size() >= maxEntries;
        }
      });
    }
    public boolean contains(T object)
    {
      return cache.get(object) != null;
    }
    public void add(T object)
    {
      cache.put(object, null);
    }
  }
  private static WriteableTransaction asWriteableTransaction(Importer importer)
  {
    return new ImporterToWriteableTransactionAdapter(importer);
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -17,6 +17,10 @@
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.backends.pluggable.EntryIDSet.*;
import static org.forgerock.util.Pair.of;
import static org.mockito.Mockito.*;
import static org.opends.server.backends.pluggable.DnKeyFormat.dnToDNKey;
import static org.forgerock.opendj.ldap.ResultCode.*;
import java.io.File;
import java.nio.ByteBuffer;
@@ -38,6 +42,8 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.util.Pair;
import org.mockito.Mockito;
import org.opends.server.DirectoryServerTestCase;
@@ -46,6 +52,7 @@
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.DnValidationCursorDecorator;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.EntryIDSetsCollector;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor;
@@ -60,6 +67,7 @@
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.crypto.CryptoSuite;
import org.opends.server.types.DirectoryException;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.PackedLong;
@@ -78,6 +86,100 @@
    testBufferImplementation(new MemoryBuffer(ByteBuffer.allocateDirect(1024)));
  }
  @Test
  public void testDnValidationThrowsOnMissingBaseDn() throws DirectoryException
  {
    // Given
    final SequentialCursor<ByteString, ByteString> source =
        cursorOf(
                       of(dnKey("ou=people"), entryId(1)),
            of(dnKey("uid=user.0,ou=people"), entryId(2)));
    // When
    final Exception exception = validateDNs(source);
    // Then
    assertDirectoryExceptionThrown(exception, NO_SUCH_OBJECT);
    assertThat(exception.getMessage()).contains("ou=people")
                                      .doesNotContain("user.0");
  }
  private ByteString dnKey(String dn)
  {
    return dnToDNKey(DN.valueOf(dn), 0);
  }
  private static ByteString entryId(long id) {
    return new EntryID(id).toByteString();
  }
  /**
   * Cursor completely the source using the {@link DnValidationCursorDecorator} which will throw a
   * {@link StorageRuntimeException} when detecting an invalid DN.
   */
  private Exception validateDNs(SequentialCursor<ByteString, ByteString> source)
      throws DirectoryException, StorageRuntimeException
  {
    final ID2Entry id2entry = mock(ID2Entry.class);
    when(id2entry.get((ReadableTransaction) any(), (EntryID) any())).thenThrow(new StorageRuntimeException("unused"));
    // When
    try
    {
      toPairs(new DnValidationCursorDecorator(source, id2entry, mock(ReadableTransaction.class)));
      fail("Exception expected");
      return null;
    }
    catch (Exception e)
    {
      return e;
    }
  }
  private void assertDirectoryExceptionThrown(final Exception exception, ResultCode expectedResultCode)
  {
    assertThat(exception).isExactlyInstanceOf(StorageRuntimeException.class);
    assertThat(exception.getCause()).isExactlyInstanceOf(DirectoryException.class);
    assertThat(((DirectoryException) exception.getCause()).getResultCode()).isEqualTo(expectedResultCode);
  }
  @Test
  public void testDnValidationThrowsOnOrphans() throws DirectoryException, StorageRuntimeException {
    // Given
    final SequentialCursor<ByteString, ByteString> source =
        cursorOf(
                             of(dnKey(""), entryId(1)),
                    of(dnKey("ou=people"), entryId(2)),
         of(dnKey("uid=user.0,ou=people"), entryId(3)),
           of(dnKey("uid=doh,ou=people1"), entryId(4)));
    // When
    final Exception exception = validateDNs(source);
    // Then
    assertDirectoryExceptionThrown(exception, NO_SUCH_OBJECT);
    assertThat(exception.getMessage()).contains("uid=doh");
  }
  @Test
  public void testDnValidationThrowsOnDuplicates() throws DirectoryException, StorageRuntimeException {
    // Given
    final SequentialCursor<ByteString, ByteString> source =
        cursorOf(
                           of(dnKey(""), entryId(1)),
                  of(dnKey("ou=people"), entryId(2)),
       of(dnKey("uid=user.0,ou=people"), entryId(3)),
       of(dnKey("uid=user.2,ou=people"), entryId(4)),
       of(dnKey("uid=user.2,ou=people"), entryId(5)));
    // When
    final Exception exception = validateDNs(source);
    // Then
    assertDirectoryExceptionThrown(exception, ENTRY_ALREADY_EXISTS);
    assertThat(exception.getMessage()).contains("uid=user.2");
  }
  private static void testBufferImplementation(MemoryBuffer buffer)
  {
    final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4, 1 });
@@ -143,7 +245,6 @@
  }
  @Test
  @SuppressWarnings(value = "unchecked")
  public void testCounterCollector()
  {
    final MeteredCursor<String, ByteString> source = cursorOf(
@@ -165,7 +266,6 @@
  }
  @Test
  @SuppressWarnings(value = "unchecked")
  public void testEntryIDSetCollector()
  {
    final MeteredCursor<String, ByteString> source = cursorOf(
@@ -279,8 +379,8 @@
    long offset = 0;
    for (Chunk source : memoryChunks)
    {
      final FileRegion region = new FileRegion(channel, offset, source.size());
      try(final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
      try(final FileRegion region = new FileRegion(channel, offset, source.size());
          final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
        regions.add(Pair.of(offset, region.write(cursor)));
      }
      offset += source.size();
@@ -356,7 +456,8 @@
    return collection;
  }
  private final static <K, V> MeteredCursor<K, V> cursorOf(@SuppressWarnings("unchecked") Pair<K, V>... pairs)
  @SafeVarargs
  private final static <K, V> MeteredCursor<K, V> cursorOf(Pair<K, V>... pairs)
  {
    return cursorOf(Arrays.asList(pairs));
  }