From 5111f7cbf41682bebb67bd0876818b9432961101 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 05 Sep 2016 09:33:27 +0000
Subject: [PATCH] OPENDJ-3263: import with DN validation on JE is using high disk space
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 508 +++++++++++++++++++++++----------------------
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java | 111 +++++++++
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java | 30 ++
3 files changed, 389 insertions(+), 260 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
index 998628d..50e5d31 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
@@ -11,11 +11,15 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
- * Copyright 2015 ForgeRock AS.
+ * Copyright 2015-2016 ForgeRock AS.
*/
package org.opends.server.backends.pluggable;
+import java.util.concurrent.ExecutionException;
+
+import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.backends.RebuildConfig;
+import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
@@ -28,20 +32,34 @@
* @param importConfig
* The configuration to use when performing the import
* @return Information about the result of the import processing
- * @throws Exception
+ * @throws InitializationException
+ * If a problem occurs during initialization
+ * @throws ConfigException
+ * If the configuration is invalid
+ * @throws InterruptedException
+ * If the import process has been interrupted
+ * @throws ExecutionException
* If a problem occurs while performing the LDIF import
* @see {@link Backend#importLDIF(LDIFImportConfig, ServerContext)}
*/
- LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception;
+ LDIFImportResult importLDIF(LDIFImportConfig importConfig)
+ throws InitializationException, ConfigException, InterruptedException, ExecutionException;
/**
* Rebuild indexes.
*
* @param rebuildConfig
* The configuration to sue when performing the rebuild.
- * @throws Exception
- * If a problem occurs while performing the rebuild.
+ * @throws InitializationException
+ * If a problem occurs during initialization
+ * @throws ConfigException
+ * If the configuration is invalid
+ * @throws InterruptedException
+ * If the rebuild process has been interrupted
+ * @throws ExecutionException
+ * If a problem occurs while performing the rebuild
* @see {@link Backend#rebuildIndex(RebuildConfig, ServerContext)}
*/
- void rebuildIndex(RebuildConfig rebuildConfig) throws Exception;
+ void rebuildIndex(RebuildConfig rebuildConfig)
+ throws InitializationException, ConfigException, InterruptedException, ExecutionException;
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index e46e160..b38f931 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -36,6 +36,7 @@
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
+import static org.forgerock.opendj.ldap.ResultCode.*;
import java.io.Closeable;
import java.io.File;
@@ -58,7 +59,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -98,14 +99,13 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
-import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.spi.Indexer;
-import org.forgerock.util.Reject;
-import org.forgerock.util.Utils;
-import org.forgerock.util.promise.PromiseImpl;
import org.forgerock.opendj.server.config.meta.BackendIndexCfgDefn.IndexType;
import org.forgerock.opendj.server.config.server.BackendIndexCfg;
import org.forgerock.opendj.server.config.server.PluggableBackendCfg;
+import org.forgerock.util.Reject;
+import org.forgerock.util.Utils;
+import org.forgerock.util.promise.PromiseImpl;
import org.opends.server.api.CompressedSchema;
import org.opends.server.backends.RebuildConfig;
import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
@@ -180,7 +180,8 @@
}
@Override
- public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
+ public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
+ throws InitializationException, ConfigException, InterruptedException, ExecutionException
{
logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
@@ -196,45 +197,49 @@
final OnDiskMergeImporter importer;
final ExecutorService sorter =
Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
- final LDIFReaderSource source =
- new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
- final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
- try (final Importer dbStorage = rootContainer.getStorage().startImport())
+ try (final LDIFReaderSource source =
+ new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount))
{
- final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
- final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
- ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
- : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
-
- importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
- importer.doImport(source);
- }
- finally
- {
- sorter.shutdownNow();
- if (OperatingSystem.isWindows())
+ final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
+ try (final Importer dbStorage = rootContainer.getStorage().startImport())
{
- // Try to force the JVM to close mmap()ed file so that they can be deleted.
- // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
- System.gc();
- Runtime.getRuntime().runFinalization();
+ final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
+ final AbstractTwoPhaseImportStrategy importStrategy =
+ new ExternalSortAndImportStrategy(entryContainers, dbStorage, tempDir, bufferPool, sorter);
+ importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
+ importer.doImport(source);
}
- recursiveDelete(tempDir);
- }
- logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
- / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
+ finally
+ {
+ sorter.shutdownNow();
+ if (OperatingSystem.isWindows())
+ {
+ // Try to force the JVM to close mmap()ed file so that they can be deleted.
+ // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+ System.gc();
+ Runtime.getRuntime().runFinalization();
+ }
+ recursiveDelete(tempDir);
+ }
+ logger.info(NOTE_IMPORT_PHASE_STATS,
+ importer.getTotalTimeInMillis() / 1000,
+ importer.getPhaseOneTimeInMillis() / 1000,
+ importer.getPhaseTwoTimeInMillis() / 1000);
- final long importTime = System.currentTimeMillis() - startTime;
- float rate = 0;
- if (importTime > 0)
- {
- rate = 1000f * source.getEntriesRead() / importTime;
+ final long importTime = System.currentTimeMillis() - startTime;
+ float rate = 0;
+ if (importTime > 0)
+ {
+ rate = 1000f * source.getEntriesRead() / importTime;
+ }
+ logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
+ .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
+ return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source.getEntriesIgnored());
}
- logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
- .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
-
- return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
- .getEntriesIgnored());
+ }
+ catch (IOException e)
+ {
+ throw new ExecutionException(e);
}
}
@@ -265,17 +270,26 @@
}
@Override
- public void rebuildIndex(final RebuildConfig rebuildConfig) throws Exception
+ public void rebuildIndex(final RebuildConfig rebuildConfig)
+ throws InitializationException, ExecutionException, ConfigException, InterruptedException
{
final EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
- final long totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>()
+ final long totalEntries;
+ try
{
- @Override
- public Long run(ReadableTransaction txn) throws Exception
+ totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>()
{
- return entryContainer.getID2Entry().getRecordCount(txn);
- }
- });
+ @Override
+ public Long run(ReadableTransaction txn) throws Exception
+ {
+ return entryContainer.getID2Entry().getRecordCount(txn);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ throw new ExecutionException(e);
+ }
final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries);
if (rebuildConfig.isClearDegradedState())
@@ -289,20 +303,28 @@
}
}
- private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes) throws Exception
+ private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes)
+ throws ExecutionException
{
- rootContainer.getStorage().write(new WriteOperation()
+ try
{
- @Override
- public void run(WriteableTransaction txn) throws Exception
+ rootContainer.getStorage().write(new WriteOperation()
{
- visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn)));
- }
- });
+ @Override
+ public void run(WriteableTransaction txn)
+ {
+ visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn)));
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ throw new ExecutionException(e);
+ }
}
private void rebuildIndex(EntryContainer entryContainer, String tmpDirectory, Set<String> indexesToRebuild,
- long totalEntries) throws Exception
+ long totalEntries) throws InitializationException, ConfigException, InterruptedException, ExecutionException
{
if (indexesToRebuild.isEmpty())
{
@@ -593,7 +615,7 @@
}
/** Source of LDAP {@link Entry}s to process. */
- private interface Source
+ private interface Source extends Closeable
{
/** Process {@link Entry}s extracted from a {@link Source}. */
interface EntryProcessor
@@ -601,7 +623,7 @@
void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws Exception;
}
- void processAllEntries(EntryProcessor processor) throws Exception;
+ void processAllEntries(EntryProcessor processor) throws InterruptedException, ExecutionException;
boolean isCancelled();
}
@@ -632,7 +654,13 @@
}
@Override
- public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
+ public void close()
+ {
+ closeSilently(reader);
+ }
+
+ @Override
+ public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException
{
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
@@ -792,12 +820,17 @@
}
@Override
- public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
+ public void close() {
+ executor.shutdown();
+ }
+
+ @Override
+ public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException
{
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS);
- final PromiseImpl<Void, Exception> promise = PromiseImpl.create();
+ final PromiseImpl<Void, ExecutionException> promise = PromiseImpl.create();
final ID2Entry id2Entry = entryContainer.getID2Entry();
try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(id2Entry.getName()))
{
@@ -816,11 +849,16 @@
new EntryID(key), id2Entry.entryFromDatabase(value, schema));
nbEntriesProcessed.incrementAndGet();
}
- catch (Exception e)
+ catch (ExecutionException e)
{
interrupted = true;
promise.handleException(e);
}
+ catch (Exception e)
+ {
+ interrupted = true;
+ promise.handleException(new ExecutionException(e));
+ }
}
});
}
@@ -835,7 +873,7 @@
// Forward exception if any
if (promise.isDone())
{
- promise.getOrThrow(0, TimeUnit.SECONDS);
+ promise.getOrThrow();
}
}
@@ -904,7 +942,7 @@
this.importStrategy = importStrategy;
}
- private void doImport(final Source source) throws Exception
+ private void doImport(final Source source) throws InterruptedException, ExecutionException
{
final long phaseOneStartTime = System.currentTimeMillis();
final PhaseOneWriteableTransaction transaction = new PhaseOneWriteableTransaction(importStrategy);
@@ -913,36 +951,42 @@
final ConcurrentMap<EntryContainer, CountDownLatch> importedContainers = new ConcurrentHashMap<>();
// Start phase one
- source.processAllEntries(new Source.EntryProcessor()
+ try
{
- @Override
- public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
- InterruptedException
+ source.processAllEntries(new Source.EntryProcessor()
{
- CountDownLatch latch = importedContainers.get(container);
- if (latch == null)
+ @Override
+ public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
+ InterruptedException
{
- final CountDownLatch newLatch = new CountDownLatch(1);
- if (importedContainers.putIfAbsent(container, newLatch) == null)
+ CountDownLatch latch = importedContainers.get(container);
+ if (latch == null)
{
- try
+ final CountDownLatch newLatch = new CountDownLatch(1);
+ if (importedContainers.putIfAbsent(container, newLatch) == null)
{
- importStrategy.beforePhaseOne(container);
+ try
+ {
+ importStrategy.beforePhaseOne(container);
+ }
+ finally
+ {
+ newLatch.countDown();
+ }
}
- finally
- {
- newLatch.countDown();
- }
+ latch = importedContainers.get(container);
}
- latch = importedContainers.get(container);
- }
- latch.await();
+ latch.await();
- importStrategy.validate(container, entryID, entry);
- container.importEntry(transaction, entryID, entry);
- importedCount.incrementAndGet();
- }
- });
+ container.importEntry(transaction, entryID, entry);
+ importedCount.incrementAndGet();
+ }
+ });
+ }
+ finally
+ {
+ closeSilently(source);
+ }
phaseOneTimeMs = System.currentTimeMillis() - phaseOneStartTime;
if (source.isCancelled())
@@ -1023,8 +1067,6 @@
this.sorter = sorter;
}
- abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
-
void beforePhaseOne(EntryContainer entryContainer)
{
entryContainer.delete(asWriteableTransaction(importer));
@@ -1057,13 +1099,14 @@
}
final Callable<Void> newDN2IDImporterTask(TreeName treeName, final Chunk source,
- PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported)
+ PhaseTwoProgressReporter progressReporter)
{
final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
+ final ID2Entry id2entry = entryContainer.getID2Entry();
final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
- return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source,
- id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
+ return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, id2entry, entryContainer.getDN2ID(),
+ source, id2count, newPhaseTwoCollector(entryContainer, id2count.getName()));
}
final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source,
@@ -1091,25 +1134,20 @@
}
/**
- * No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated
- * {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into
- * the database through {@link ImporterToChunkAdapter}.
+ * During phase one, import all {@link TreeName} (but id2entry) into a dedicated and temporary
+ * {@link ExternalSortChunk} which will sort the keys in the ascending order. Phase two will copy the sorted keys into
+ * the database using the {@link Importer}. id2entry database is imported directly into the database using
+ * {@link ImporterToChunkAdapter}.
*/
- private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy
+ private static final class ExternalSortAndImportStrategy extends AbstractTwoPhaseImportStrategy
{
- SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
+ ExternalSortAndImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
BufferPool bufferPool, Executor sorter)
{
super(entryContainers, importer, tempDir, bufferPool, sorter);
}
@Override
- public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry)
- {
- // No validation performed. All entries are considered valid.
- }
-
- @Override
public Chunk newChunk(TreeName treeName) throws Exception
{
if (isID2Entry(treeName))
@@ -1131,7 +1169,7 @@
}
else if (isDN2ID(treeName))
{
- return newDN2IDImporterTask(treeName, source, progressReporter, false);
+ return newDN2IDImporterTask(treeName, source, progressReporter);
}
else if (isVLVIndex(entryContainer, treeName))
{
@@ -1141,96 +1179,6 @@
}
}
- /**
- * This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that
- * the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of
- * id2entry. Others tree are externally sorted before being imported into the database.
- */
- private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements
- ReadableTransaction
- {
- private static final int DN_CACHE_SIZE = 16;
- private final LRUPresenceCache<DN> dnCache = new LRUPresenceCache<>(DN_CACHE_SIZE);
-
- SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
- BufferPool bufferPool, Executor sorter)
- {
- super(entryContainers, importer, tempDir, bufferPool, sorter);
- }
-
- @Override
- public Chunk newChunk(TreeName treeName) throws Exception
- {
- if (isID2Entry(treeName))
- {
- return new MostlyOrderedChunk(asChunk(treeName, importer));
- }
- else if (isDN2ID(treeName))
- {
- return asChunk(treeName, importer);
- }
- return newExternalSortChunk(treeName);
- }
-
- @Override
- public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk source,
- PhaseTwoProgressReporter progressReporter)
- {
- final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
-
- if (isID2Entry(treeName))
- {
- return newFlushTask(source);
- }
- else if (isDN2ID(treeName))
- {
- return newDN2IDImporterTask(treeName, source, progressReporter, true);
- }
- else if (isVLVIndex(entryContainer, treeName))
- {
- return newVLVIndexImporterTask(getVLVIndex(entryContainer, treeName), source, progressReporter);
- }
- return newChunkCopierTask(treeName, source, progressReporter);
- }
-
- @Override
- public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
- {
- final DN2ID dn2Id = entryContainer.getDN2ID();
- final DN entryDN = entry.getName();
- final DN parentDN = entryContainer.getParentWithinBase(entryDN);
-
- if (parentDN != null && !dnCache.contains(parentDN) && dn2Id.get(this, parentDN) == null)
- {
- throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
- }
-
- if (dn2Id.get(this, entryDN) != null)
- {
- throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry));
- }
- dnCache.add(entryDN);
- }
-
- @Override
- public ByteString read(TreeName treeName, ByteSequence key)
- {
- return importer.read(treeName, key);
- }
-
- @Override
- public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getRecordCount(TreeName treeName)
- {
- throw new UnsupportedOperationException();
- }
- }
-
/** Import only a specific indexes list while ignoring everything else. */
private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy
{
@@ -1241,7 +1189,7 @@
{
super(entryContainers, importer, tempDir, bufferPool, sorter);
this.indexesToRebuild = new HashSet<>(indexNames.size());
- for(String indexName : indexNames)
+ for (String indexName : indexNames)
{
this.indexesToRebuild.add(indexName.toLowerCase());
}
@@ -1280,7 +1228,7 @@
{
if (isDN2ID(treeName))
{
- return newDN2IDImporterTask(treeName, source, progressReporter, false);
+ return newDN2IDImporterTask(treeName, source, progressReporter);
}
else if (isVLVIndex(entryContainer, treeName))
{
@@ -1291,12 +1239,6 @@
// Do nothing (flush null chunk)
return newFlushTask(source);
}
-
- @Override
- public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
- {
- // No validation performed. All entries are considered valid.
- }
}
private static <V> List<V> invokeParallel(String threadNameTemplate, Collection<Callable<V>> tasks)
@@ -1566,7 +1508,7 @@
throw new StorageRuntimeException(e);
}
- return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
+ final CompositeCursor<ByteString, ByteString> cursor = new CompositeCursor<ByteString, ByteString>(name, cursors)
{
@Override
public void close()
@@ -1587,7 +1529,10 @@
}
closeSilently(channel);
}
- }, (Collector<?, ByteString>) phaseTwoDeduplicator);
+ };
+ return phaseTwoDeduplicator != null
+ ? new CollectorCursor<>(cursor, (Collector<?, ByteString>) phaseTwoDeduplicator)
+ : cursor;
}
@Override
@@ -1625,8 +1570,9 @@
checkThreadNotInterrupted();
final int regionSize;
try (final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
- final SequentialCursor<ByteString, ByteString> source =
- new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
+ final SequentialCursor<ByteString, ByteString> source = phaseOneDeduplicator != null
+ ? new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator)
+ : chunk.flip())
{
regionSize = region.write(source);
}
@@ -2352,6 +2298,7 @@
private final Importer importer;
private final File tempDir;
private final BufferPool bufferPool;
+ private final ID2Entry id2entry;
private final DN2ID dn2id;
private final ID2ChildrenCount id2count;
private final Collector<?, ByteString> id2countCollector;
@@ -2359,18 +2306,19 @@
private final Chunk dn2IdDestination;
DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool,
- DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, Collector<?, ByteString> id2countCollector,
- boolean dn2idAlreadyImported)
+ ID2Entry id2Entry, DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count,
+ Collector<?, ByteString> id2countCollector)
{
this.reporter = progressReporter;
this.importer = importer;
this.tempDir = tempDir;
this.bufferPool = bufferPool;
+ this.id2entry = id2Entry;
this.dn2id = dn2id;
this.dn2IdSourceChunk = dn2IdChunk;
this.id2count = id2count;
this.id2countCollector = id2countCollector;
- this.dn2IdDestination = dn2idAlreadyImported ? nullChunk() : asChunk(dn2id.getName(), importer);
+ this.dn2IdDestination = asChunk(dn2id.getName(), importer);
}
@Override
@@ -2381,17 +2329,20 @@
id2countCollector, sameThreadExecutor());
long totalNumberOfEntries = 0;
- final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
- try (final MeteredCursor<ByteString, ByteString> chunkCursor = dn2IdSourceChunk.flip();
- final SequentialCursor<ByteString, ByteString> dn2idCursor =
- dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
+ final TreeVisitor<ChildrenCount> childrenCountVisitor =
+ new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
+ try (final SequentialCursor<ByteString, ByteString> chunkCursor =
+ trackCursorProgress(reporter, dn2IdSourceChunk.flip());
+ final DnValidationCursorDecorator validatorCursor =
+ new DnValidationCursorDecorator(chunkCursor, id2entry, asWriteableTransaction(importer));
+ final SequentialCursor<ByteString, ByteString> dn2idCursor =
+ dn2id.openCursor(validatorCursor, childrenCountVisitor))
{
- checkThreadNotInterrupted();
while (dn2idCursor.next())
{
+ checkThreadNotInterrupted();
dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
totalNumberOfEntries++;
- checkThreadNotInterrupted();
}
}
id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2445,6 +2396,95 @@
}
}
+ /**
+ * Throw a {@link StorageRuntimeException} when a duplicate or orphan DNs is detected. DNs returned by the decorated
+ * cursor must be sorted.
+ */
+ static final class DnValidationCursorDecorator extends
+ SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString>
+ {
+ private final LinkedList<ByteString> parentDns = new LinkedList<>();
+ private final ID2Entry id2entry;
+ private final ReadableTransaction txn;
+
+ DnValidationCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, ID2Entry id2entry,
+ ReadableTransaction txn)
+ {
+ super(delegate);
+ this.id2entry = id2entry;
+ this.txn = txn;
+ }
+
+ @Override
+ public boolean next()
+ {
+ if (!delegate.next())
+ {
+ return false;
+ }
+ final ByteString dn = delegate.getKey();
+ try
+ {
+ throwIfDuplicate(dn);
+ throwIfOrphan(dn);
+ }
+ catch (DirectoryException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ parentDns.add(dn);
+ return true;
+ }
+
+ private void throwIfDuplicate(ByteString dn) throws DirectoryException
+ {
+ if (dn.equals(parentDns.peekLast()))
+ {
+ throw new DirectoryException(ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(getDnAsString()));
+ }
+ }
+
+ private String getDnAsString() {
+ try
+ {
+ return id2entry.get(txn, new EntryID(delegate.getValue())).getName().toString();
+ }
+ catch (Exception e)
+ {
+ return DnKeyFormat.keyToDNString(delegate.getKey());
+ }
+ }
+
+ private void throwIfOrphan(ByteString dn) throws DirectoryException
+ {
+ if (!parentExists(dn)) {
+ throw new DirectoryException(NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(getDnAsString()));
+ }
+ }
+
+ private boolean parentExists(ByteString childDn)
+ {
+ final Iterator<ByteString> it = parentDns.descendingIterator();
+ int i = parentDns.size();
+ while (it.hasNext())
+ {
+ if (DnKeyFormat.isChild(it.next(), childDn))
+ {
+ if (i < parentDns.size())
+ {
+ // Reset the last element in the stack to be the parentDn:
+ // (removes siblings, nephews, grand-nephews, etc. of childDn)
+ parentDns.subList(i, parentDns.size()).clear();
+ }
+ return true;
+ }
+ i--;
+ }
+ // First DN must represent the base-dn which is encoded as an empty ByteString.
+ return parentDns.isEmpty() && childDn.isEmpty();
+ }
+ }
+
private static Importer asImporter(Chunk chunk)
{
return new ChunkToImporterAdapter(chunk);
@@ -3037,7 +3077,12 @@
// key conflicts == sum values
return ID2ChildrenCount.getSumLongCollectorInstance();
}
- else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
+ else if (isDN2ID(treeName))
+ {
+ // Detection of duplicate DN will be performed during phase 2 by the DNImporterTask
+ return null;
+ }
+ else if (isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
{
// key conflicts == exception
return UniqueValueCollector.getInstance();
@@ -3681,41 +3726,6 @@
}
}
- /**
- * Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here
- * to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least
- * recently used property of the cache would not be respected.
- */
- private static final class LRUPresenceCache<T>
- {
- private final Map<T, Object> cache;
-
- LRUPresenceCache(final int maxEntries)
- {
- // +1 because newly added entry is added before the least recently one is removed.
- this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
- {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
- {
- return size() >= maxEntries;
- }
- });
- }
-
- public boolean contains(T object)
- {
- return cache.get(object) != null;
- }
-
- public void add(T object)
- {
- cache.put(object, null);
- }
- }
-
private static WriteableTransaction asWriteableTransaction(Importer importer)
{
return new ImporterToWriteableTransactionAdapter(importer);
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index 65b6101..947b3f4 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -17,6 +17,10 @@
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.backends.pluggable.EntryIDSet.*;
+import static org.forgerock.util.Pair.of;
+import static org.mockito.Mockito.*;
+import static org.opends.server.backends.pluggable.DnKeyFormat.dnToDNKey;
+import static org.forgerock.opendj.ldap.ResultCode.*;
import java.io.File;
import java.nio.ByteBuffer;
@@ -38,6 +42,8 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.util.Pair;
import org.mockito.Mockito;
import org.opends.server.DirectoryServerTestCase;
@@ -46,6 +52,7 @@
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.DnValidationCursorDecorator;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.EntryIDSetsCollector;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor;
@@ -60,6 +67,7 @@
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.crypto.CryptoSuite;
+import org.opends.server.types.DirectoryException;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.PackedLong;
@@ -78,6 +86,100 @@
testBufferImplementation(new MemoryBuffer(ByteBuffer.allocateDirect(1024)));
}
+ @Test
+ public void testDnValidationThrowsOnMissingBaseDn() throws DirectoryException
+ {
+ // Given
+ final SequentialCursor<ByteString, ByteString> source =
+ cursorOf(
+ of(dnKey("ou=people"), entryId(1)),
+ of(dnKey("uid=user.0,ou=people"), entryId(2)));
+
+ // When
+ final Exception exception = validateDNs(source);
+
+ // Then
+ assertDirectoryExceptionThrown(exception, NO_SUCH_OBJECT);
+ assertThat(exception.getMessage()).contains("ou=people")
+ .doesNotContain("user.0");
+ }
+
+ private ByteString dnKey(String dn)
+ {
+ return dnToDNKey(DN.valueOf(dn), 0);
+ }
+
+ private static ByteString entryId(long id) {
+ return new EntryID(id).toByteString();
+ }
+
+ /**
+ * Cursor completely the source using the {@link DnValidationCursorDecorator} which will throw a
+ * {@link StorageRuntimeException} when detecting an invalid DN.
+ */
+ private Exception validateDNs(SequentialCursor<ByteString, ByteString> source)
+ throws DirectoryException, StorageRuntimeException
+ {
+ final ID2Entry id2entry = mock(ID2Entry.class);
+ when(id2entry.get((ReadableTransaction) any(), (EntryID) any())).thenThrow(new StorageRuntimeException("unused"));
+
+ // When
+ try
+ {
+ toPairs(new DnValidationCursorDecorator(source, id2entry, mock(ReadableTransaction.class)));
+ fail("Exception expected");
+ return null;
+ }
+ catch (Exception e)
+ {
+ return e;
+ }
+ }
+
+ private void assertDirectoryExceptionThrown(final Exception exception, ResultCode expectedResultCode)
+ {
+ assertThat(exception).isExactlyInstanceOf(StorageRuntimeException.class);
+ assertThat(exception.getCause()).isExactlyInstanceOf(DirectoryException.class);
+ assertThat(((DirectoryException) exception.getCause()).getResultCode()).isEqualTo(expectedResultCode);
+ }
+
+ @Test
+ public void testDnValidationThrowsOnOrphans() throws DirectoryException, StorageRuntimeException {
+ // Given
+ final SequentialCursor<ByteString, ByteString> source =
+ cursorOf(
+ of(dnKey(""), entryId(1)),
+ of(dnKey("ou=people"), entryId(2)),
+ of(dnKey("uid=user.0,ou=people"), entryId(3)),
+ of(dnKey("uid=doh,ou=people1"), entryId(4)));
+
+ // When
+ final Exception exception = validateDNs(source);
+
+ // Then
+ assertDirectoryExceptionThrown(exception, NO_SUCH_OBJECT);
+ assertThat(exception.getMessage()).contains("uid=doh");
+ }
+
+ @Test
+ public void testDnValidationThrowsOnDuplicates() throws DirectoryException, StorageRuntimeException {
+ // Given
+ final SequentialCursor<ByteString, ByteString> source =
+ cursorOf(
+ of(dnKey(""), entryId(1)),
+ of(dnKey("ou=people"), entryId(2)),
+ of(dnKey("uid=user.0,ou=people"), entryId(3)),
+ of(dnKey("uid=user.2,ou=people"), entryId(4)),
+ of(dnKey("uid=user.2,ou=people"), entryId(5)));
+
+ // When
+ final Exception exception = validateDNs(source);
+
+ // Then
+ assertDirectoryExceptionThrown(exception, ENTRY_ALREADY_EXISTS);
+ assertThat(exception.getMessage()).contains("uid=user.2");
+ }
+
private static void testBufferImplementation(MemoryBuffer buffer)
{
final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4, 1 });
@@ -143,7 +245,6 @@
}
@Test
- @SuppressWarnings(value = "unchecked")
public void testCounterCollector()
{
final MeteredCursor<String, ByteString> source = cursorOf(
@@ -165,7 +266,6 @@
}
@Test
- @SuppressWarnings(value = "unchecked")
public void testEntryIDSetCollector()
{
final MeteredCursor<String, ByteString> source = cursorOf(
@@ -279,8 +379,8 @@
long offset = 0;
for (Chunk source : memoryChunks)
{
- final FileRegion region = new FileRegion(channel, offset, source.size());
- try(final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
+ try(final FileRegion region = new FileRegion(channel, offset, source.size());
+ final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
regions.add(Pair.of(offset, region.write(cursor)));
}
offset += source.size();
@@ -356,7 +456,8 @@
return collection;
}
- private final static <K, V> MeteredCursor<K, V> cursorOf(@SuppressWarnings("unchecked") Pair<K, V>... pairs)
+ @SafeVarargs
+ private final static <K, V> MeteredCursor<K, V> cursorOf(Pair<K, V>... pairs)
{
return cursorOf(Arrays.asList(pairs));
}
--
Gitblit v1.10.0