From acf59627f06a7d061b3da8f61c60f6d2dc8d7a8d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 20 May 2015 12:11:06 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java | 8
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 1398 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 1,402 insertions(+), 4 deletions(-)
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
index 96a0a74..c3c2cac 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
@@ -319,9 +319,9 @@
this.dnCache = null;
}
- @SuppressWarnings("javadoc")
- OnDiskMergeBufferImporter(RootContainer rootContainer, LDIFImportConfig importCfg, PluggableBackendCfg backendCfg,
- ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException
+ private OnDiskMergeBufferImporter(RootContainer rootContainer, LDIFImportConfig importCfg,
+ PluggableBackendCfg backendCfg, ServerContext serverContext)
+ throws InitializationException, ConfigException, StorageRuntimeException
{
this.rootContainer = rootContainer;
this.rebuildManager = null;
@@ -3230,7 +3230,7 @@
* @param entryLimit
* The entry limit for the index.
*/
- private IndexKey(AttributeType attributeType, String indexID, int entryLimit)
+ IndexKey(AttributeType attributeType, String indexID, int entryLimit)
{
this.attributeType = attributeType;
this.indexID = indexID;
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
new file mode 100644
index 0000000..005b07c
--- /dev/null
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -0,0 +1,1398 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011-2015 ForgeRock AS
+ */
+package org.opends.server.backends.pluggable;
+
+import static org.opends.messages.BackendMessages.*;
+import static org.opends.server.backends.pluggable.DnKeyFormat.*;
+import static org.opends.server.core.DirectoryServer.*;
+import static org.opends.server.util.DynamicConstants.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
+import org.opends.server.admin.std.server.BackendIndexCfg;
+import org.opends.server.admin.std.server.PersistitBackendCfg;
+import org.opends.server.admin.std.server.PluggableBackendCfg;
+import org.opends.server.backends.persistit.PersistItStorage;
+import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
+import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
+import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.DNCache;
+import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.IndexKey;
+import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.ReadOperation;
+import org.opends.server.backends.pluggable.spi.ReadableTransaction;
+import org.opends.server.backends.pluggable.spi.Storage;
+import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
+import org.opends.server.backends.pluggable.spi.TreeName;
+import org.opends.server.backends.pluggable.spi.UpdateFunction;
+import org.opends.server.backends.pluggable.spi.WriteOperation;
+import org.opends.server.backends.pluggable.spi.WriteableTransaction;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ServerContext;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.LDIFImportResult;
+import org.opends.server.util.Platform;
+
+/**
+ * This class provides the engine that performs both importing of LDIF files and
+ * the rebuilding of indexes.
+ */
+final class OnDiskMergeStorageImporter
+{
+ /**
+ * Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting
+ * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
+ */
+ @SuppressWarnings("javadoc")
+ static final class StrategyImpl implements ImportStrategy
+ {
+ private final PluggableBackendCfg backendCfg;
+
+ StrategyImpl(PluggableBackendCfg backendCfg)
+ {
+ this.backendCfg = backendCfg;
+ }
+
+ @Override
+ public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer,
+ ServerContext serverContext) throws DirectoryException, InitializationException
+ {
+ try
+ {
+ return new OnDiskMergeStorageImporter(rootContainer, importConfig, backendCfg, serverContext).processImport();
+ }
+ catch (DirectoryException | InitializationException e)
+ {
+ logger.traceException(e);
+ throw e;
+ }
+ catch (ConfigException e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+ }
+ catch (Exception e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(),
+ LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
+ }
+ }
+ }
+
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private static final int TIMER_INTERVAL = 10000;
+ private static final String DEFAULT_TMP_DIR = "import-tmp";
+ private static final String DN_CACHE_DIR = "dn-cache";
+
+ /** Defaults for DB cache. */
+ private static final int MAX_DB_CACHE_SIZE = 8 * MB;
+ private static final int MAX_DB_LOG_SIZE = 10 * MB;
+ private static final int MIN_DB_CACHE_SIZE = 4 * MB;
+
+ /**
+ * Defaults for LDIF reader buffers, min memory required to import and default
+ * size for byte buffers.
+ */
+ private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
+ private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE + MAX_DB_LOG_SIZE;
+
+ /** Max size of phase one buffer. */
+ private static final int MAX_BUFFER_SIZE = 2 * MB;
+ /** Min size of phase one buffer. */
+ private static final int MIN_BUFFER_SIZE = 4 * KB;
+ /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
+ private static final int SMALL_HEAP_SIZE = 256 * MB;
+
+ /** Root container. */
+ private final RootContainer rootContainer;
+ /** Import configuration. */
+ private final LDIFImportConfig importCfg;
+ private final ServerContext serverContext;
+
+ /** LDIF reader. */
+ private ImportLDIFReader reader;
+ /** Phase one imported entries count. */
+ private final AtomicLong importCount = new AtomicLong(0);
+ /** Migrated entry count. */
+ private int migratedCount;
+
+ /** Phase one buffer size in bytes. */
+ private int bufferSize;
+ /** Index count. */
+ private final int indexCount;
+ /** Thread count. */
+ private int threadCount;
+
+ /** Whether DN validation should be performed. If true, then it is performed during phase one. */
+ private final boolean validateDNs;
+
+ /** Temp scratch directory. */
+ private final File tempDir;
+ /** DN cache used when DN validation is done in first phase. */
+ private final DNCache dnCache;
+ /** Size in bytes of DN cache. */
+ private long dnCacheSize;
+ /** Available memory at the start of the import. */
+ private long availableMemory;
+ /** Size in bytes of DB cache. */
+ private long dbCacheSize;
+
+ /** Map of DNs to Suffix objects. */
+ private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<>();
+
+ /** Set to true if the backend was cleared. */
+ private final boolean clearedBackend;
+
+ /** Used to shutdown import if an error occurs in phase one. */
+ private volatile boolean isCanceled;
+
+ /** Number of phase one buffers. */
+ private int phaseOneBufferCount;
+
+ private OnDiskMergeStorageImporter(RootContainer rootContainer, LDIFImportConfig importCfg,
+ PluggableBackendCfg backendCfg, ServerContext serverContext)
+ throws InitializationException, ConfigException, StorageRuntimeException
+ {
+ this.rootContainer = rootContainer;
+ this.importCfg = importCfg;
+ this.serverContext = serverContext;
+
+ if (importCfg.getThreadCount() == 0)
+ {
+ this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
+ }
+ else
+ {
+ this.threadCount = importCfg.getThreadCount();
+ }
+
+ // Determine the number of indexes.
+ this.indexCount = getTotalIndexCount(backendCfg);
+
+ this.clearedBackend = mustClearBackend(importCfg, backendCfg);
+
+ validateDNs = !importCfg.getSkipDNValidation();
+ this.tempDir = prepareTempDir(backendCfg, importCfg.getTmpDirectory());
+ // be careful: requires that a few data has been set
+ computeMemoryRequirements();
+
+ if (validateDNs)
+ {
+ final File dnCachePath = new File(tempDir, DN_CACHE_DIR);
+ dnCachePath.mkdirs();
+ this.dnCache = new DNCacheImpl(dnCachePath);
+ }
+ else
+ {
+ this.dnCache = null;
+ }
+ }
+
+ private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException
+ {
+ File parentDir = getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR);
+ File tempDir = new File(parentDir, backendCfg.getBackendId());
+ recursiveDelete(tempDir);
+ if (!tempDir.exists() && !tempDir.mkdirs())
+ {
+ throw new InitializationException(ERR_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
+ }
+ return tempDir;
+ }
+
+ /**
+ * Returns whether the backend must be cleared.
+ *
+ * @param importCfg
+ * the import configuration object
+ * @param backendCfg
+ * the backend configuration object
+ * @return true if the backend must be cleared, false otherwise
+ * @see #prepareSuffix(WriteableTransaction, EntryContainer) for per-suffix cleanups.
+ */
+ private static boolean mustClearBackend(LDIFImportConfig importCfg, PluggableBackendCfg backendCfg)
+ {
+ return !importCfg.appendToExistingData()
+ && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+ /*
+ * Why do we clear when there is only one baseDN?
+ * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
+ * so if there is only one baseDN for this backend, then clear it now.
+ */
+ }
+
+ private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException
+ {
+ int indexes = 2; // dn2id, dn2uri
+ for (String indexName : backendCfg.listBackendIndexes())
+ {
+ BackendIndexCfg index = backendCfg.getBackendIndex(indexName);
+ SortedSet<IndexType> types = index.getIndexType();
+ if (types.contains(IndexType.EXTENSIBLE))
+ {
+ indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
+ }
+ else
+ {
+ indexes += types.size();
+ }
+ }
+ return indexes;
+ }
+
+ /**
+ * Calculate buffer sizes and initialize properties based on memory.
+ *
+ * @throws InitializationException
+ * If a problem occurs during calculation.
+ */
+ private void computeMemoryRequirements() throws InitializationException
+ {
+ // Calculate amount of usable memory. This will need to take into account
+ // various fudge factors, including the number of IO buffers used by the
+ // scratch writers (1 per index).
+ calculateAvailableMemory();
+
+ final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
+
+ // We need caching when doing DN validation
+ if (validateDNs)
+ {
+ // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
+ if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
+ {
+ dbCacheSize = 500 * KB;
+ dnCacheSize = 500 * KB;
+ }
+ else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
+ {
+ dbCacheSize = MIN_DB_CACHE_SIZE;
+ dnCacheSize = MIN_DB_CACHE_SIZE;
+ }
+ else if (!clearedBackend)
+ {
+ // Appending to existing data so reserve extra memory for the DB cache
+ // since it will be needed for dn2id queries.
+ dbCacheSize = usableMemory * 33 / 100;
+ dnCacheSize = usableMemory * 33 / 100;
+ }
+ else
+ {
+ dbCacheSize = MAX_DB_CACHE_SIZE;
+ dnCacheSize = usableMemory * 66 / 100;
+ }
+ }
+ else
+ {
+ // No DN validation: calculate memory for DB cache and buffers.
+
+ // No need for DN2ID cache.
+ dnCacheSize = 0;
+
+ if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
+ {
+ dbCacheSize = 500 * KB;
+ }
+ else if (usableMemory < MIN_DB_CACHE_MEMORY)
+ {
+ dbCacheSize = MIN_DB_CACHE_SIZE;
+ }
+ else
+ {
+ // No need to differentiate between append/clear backend, since dn2id is
+ // not being queried.
+ dbCacheSize = MAX_DB_CACHE_SIZE;
+ }
+ }
+
+ final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize;
+ final int oldThreadCount = threadCount;
+ if (indexCount != 0) // Avoid / by zero
+ {
+ while (true)
+ {
+ phaseOneBufferCount = 2 * indexCount * threadCount;
+
+ // Scratch writers allocate 4 buffers per index as well.
+ final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount);
+ long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount;
+ // We need (2 * bufferSize) to fit in an int for the insertByteStream
+ // and deleteByteStream constructors.
+ bufferSize = (int) Math.min(longBufferSize, Integer.MAX_VALUE / 2);
+
+ if (bufferSize > MAX_BUFFER_SIZE)
+ {
+ if (validateDNs)
+ {
+ // The buffers are big enough: the memory is best used for the DN2ID temp DB
+ bufferSize = MAX_BUFFER_SIZE;
+
+ final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
+ if (!clearedBackend)
+ {
+ dbCacheSize += extraMemory / 2;
+ dnCacheSize += extraMemory / 2;
+ }
+ else
+ {
+ dnCacheSize += extraMemory;
+ }
+ }
+
+ break;
+ }
+ else if (bufferSize > MIN_BUFFER_SIZE)
+ {
+ // This is acceptable.
+ break;
+ }
+ else if (threadCount > 1)
+ {
+ // Retry using less threads.
+ threadCount--;
+ }
+ else
+ {
+ // Not enough memory.
+ final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
+ throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(
+ usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize));
+ }
+ }
+ }
+
+ if (oldThreadCount != threadCount)
+ {
+ logger.info(NOTE_IMPORT_ADJUST_THREAD_COUNT, oldThreadCount, threadCount);
+ }
+
+ logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
+ if (dnCacheSize > 0)
+ {
+ logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize);
+ }
+ logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
+ }
+
+ /**
+ * Calculates the amount of available memory which can be used by this import,
+ * taking into account whether or not the import is running offline or online
+ * as a task.
+ */
+ private void calculateAvailableMemory()
+ {
+ final long totalAvailableMemory;
+ if (DirectoryServer.isRunning())
+ {
+ // Online import/rebuild.
+ final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
+ totalAvailableMemory = Math.max(availableMemory, 16 * MB);
+ }
+ else
+ {
+ // Offline import/rebuild.
+ totalAvailableMemory = Platform.getUsableMemoryForCaching();
+ }
+
+ // Now take into account various fudge factors.
+ int importMemPct = 90;
+ if (totalAvailableMemory <= SMALL_HEAP_SIZE)
+ {
+ // Be pessimistic when memory is low.
+ importMemPct -= 25;
+ }
+
+ availableMemory = totalAvailableMemory * importMemPct / 100;
+ }
+
+ private void initializeSuffixes(WriteableTransaction txn) throws ConfigException, DirectoryException
+ {
+ for (EntryContainer ec : rootContainer.getEntryContainers())
+ {
+ Suffix suffix = getSuffix(txn, ec);
+ if (suffix != null)
+ {
+ dnSuffixMap.put(ec.getBaseDN(), suffix);
+ }
+ }
+ }
+
+ private Suffix getSuffix(WriteableTransaction txn, EntryContainer entryContainer)
+ throws ConfigException, DirectoryException
+ {
+ if (importCfg.appendToExistingData() || importCfg.clearBackend())
+ {
+ return new Suffix(entryContainer);
+ }
+
+ final DN baseDN = entryContainer.getBaseDN();
+ if (importCfg.getExcludeBranches().contains(baseDN))
+ {
+ // This entire base DN was explicitly excluded. Skip.
+ return null;
+ }
+
+ EntryContainer sourceEntryContainer = null;
+ List<DN> excludeBranches = getDescendants(baseDN, importCfg.getExcludeBranches());
+ List<DN> includeBranches = null;
+ if (!importCfg.getIncludeBranches().isEmpty())
+ {
+ includeBranches = getDescendants(baseDN, importCfg.getIncludeBranches());
+ if (includeBranches.isEmpty())
+ {
+ // There are no branches in the explicitly defined include list under this base DN.
+ // Skip this base DN altogether.
+ return null;
+ }
+
+ // Remove any overlapping include branches.
+ Iterator<DN> includeBranchIterator = includeBranches.iterator();
+ while (includeBranchIterator.hasNext())
+ {
+ DN includeDN = includeBranchIterator.next();
+ if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN))
+ {
+ includeBranchIterator.remove();
+ }
+ }
+
+ // Remove any exclude branches that are not are not under a include branch
+ // since they will be migrated as part of the existing entries
+ // outside of the include branches anyways.
+ Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
+ while (excludeBranchIterator.hasNext())
+ {
+ DN excludeDN = excludeBranchIterator.next();
+ if (!isAnyAncestorOf(includeBranches, excludeDN))
+ {
+ excludeBranchIterator.remove();
+ }
+ }
+
+ if (excludeBranches.isEmpty()
+ && includeBranches.size() == 1
+ && includeBranches.get(0).equals(baseDN))
+ {
+ // This entire base DN is explicitly included in the import with
+ // no exclude branches that we need to migrate.
+ // Just clear the entry container.
+ clearSuffix(entryContainer);
+ }
+ else
+ {
+ sourceEntryContainer = entryContainer;
+
+ // Create a temp entry container
+ DN tempDN = baseDN.child(DN.valueOf("dc=importTmp"));
+ entryContainer = rootContainer.openEntryContainer(tempDN, txn);
+ }
+ }
+ return new Suffix(entryContainer, sourceEntryContainer, includeBranches, excludeBranches);
+ }
+
+ private List<DN> getDescendants(DN baseDN, Set<DN> dns)
+ {
+ final List<DN> results = new ArrayList<>();
+ for (DN dn : dns)
+ {
+ if (baseDN.isAncestorOf(dn))
+ {
+ results.add(dn);
+ }
+ }
+ return results;
+ }
+
+ private static void clearSuffix(EntryContainer entryContainer)
+ {
+ entryContainer.lock();
+ entryContainer.clear();
+ entryContainer.unlock();
+ }
+
+ private static boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
+ {
+ for (DN dn : dns)
+ {
+ if (!dn.equals(childDN) && dn.isAncestorOf(childDN))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean isAnyAncestorOf(List<DN> dns, DN childDN)
+ {
+ for (DN dn : dns)
+ {
+ if (dn.isAncestorOf(childDN))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private LDIFImportResult processImport() throws Exception
+ {
+ try {
+ try
+ {
+ reader = new ImportLDIFReader(importCfg, rootContainer);
+ }
+ catch (IOException ioe)
+ {
+ throw new InitializationException(ERR_IMPORT_LDIF_READER_IO_ERROR.get(), ioe);
+ }
+
+ logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
+ logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
+
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ initializeSuffixes(txn);
+ setIndexesTrusted(txn, false);
+ }
+ });
+
+ final long startTime = System.currentTimeMillis();
+ importPhaseOne();
+ final long phaseOneFinishTime = System.currentTimeMillis();
+ if (validateDNs)
+ {
+ dnCache.close();
+ }
+
+ if (isCanceled)
+ {
+ throw new InterruptedException("Import processing canceled.");
+ }
+
+ final long phaseTwoTime = System.currentTimeMillis();
+ importPhaseTwo();
+ if (isCanceled)
+ {
+ throw new InterruptedException("Import processing canceled.");
+ }
+ final long phaseTwoFinishTime = System.currentTimeMillis();
+
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ setIndexesTrusted(txn, true);
+ switchEntryContainers(txn);
+ }
+ });
+ recursiveDelete(tempDir);
+ final long finishTime = System.currentTimeMillis();
+ final long importTime = finishTime - startTime;
+ logger.info(NOTE_IMPORT_PHASE_STATS, importTime / 1000,
+ (phaseOneFinishTime - startTime) / 1000,
+ (phaseTwoFinishTime - phaseTwoTime) / 1000);
+ float rate = 0;
+ if (importTime > 0)
+ {
+ rate = 1000f * reader.getEntriesRead() / importTime;
+ }
+ logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(),
+ reader.getEntriesIgnored(), reader.getEntriesRejected(),
+ migratedCount, importTime / 1000, rate);
+ return new LDIFImportResult(reader.getEntriesRead(),
+ reader.getEntriesRejected(), reader.getEntriesIgnored());
+ }
+ finally
+ {
+ close(reader);
+ if (validateDNs)
+ {
+ close(dnCache);
+ }
+ }
+ }
+
+ private void switchEntryContainers(WriteableTransaction txn) throws StorageRuntimeException, InitializationException
+ {
+ for (Suffix suffix : dnSuffixMap.values())
+ {
+ DN baseDN = suffix.getBaseDN();
+ EntryContainer entryContainer = suffix.getSrcEntryContainer();
+ if (entryContainer != null)
+ {
+ final EntryContainer toDelete = rootContainer.unregisterEntryContainer(baseDN);
+ toDelete.lock();
+ toDelete.close();
+ toDelete.delete(txn);
+ toDelete.unlock();
+
+ final EntryContainer replacement = suffix.getEntryContainer();
+ replacement.lock();
+ replacement.setTreePrefix(baseDN.toNormalizedUrlSafeString());
+ replacement.unlock();
+ rootContainer.registerEntryContainer(baseDN, replacement);
+ }
+ }
+ }
+
+ private void setIndexesTrusted(WriteableTransaction txn, boolean trusted) throws StorageRuntimeException
+ {
+ try
+ {
+ for (Suffix s : dnSuffixMap.values())
+ {
+ s.setIndexesTrusted(txn, trusted);
+ }
+ }
+ catch (StorageRuntimeException ex)
+ {
+ throw new StorageRuntimeException(NOTE_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()).toString());
+ }
+ }
+
+ /**
+ * Reads all entries from id2entry, and:
+ * <ol>
+ * <li>compute how the entry is indexed for each index</li>
+ * <li>store the result of indexing entries into in-memory index buffers</li>
+ * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
+ * The scratch files will be read by phaseTwo to perform on-disk merge</li>
+ * </ol>
+ */
+ private void importPhaseOne() throws Exception
+ {
+ final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+ final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
+
+ final Storage storage = rootContainer.getStorage();
+ execService.submit(new MigrateExistingTask(storage)).get();
+
+ final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
+ if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
+ {
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(new ImportTask(storage));
+ }
+ }
+ execService.invokeAll(tasks);
+ tasks.clear();
+
+ execService.submit(new MigrateExcludedTask(storage)).get();
+
+ shutdownAll(timerService, execService);
+ }
+
+ private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+ {
+ timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ private static void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+ {
+ for (ExecutorService executorService : executorServices)
+ {
+ executorService.shutdown();
+ }
+ for (ExecutorService executorService : executorServices)
+ {
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ private void importPhaseTwo() throws Exception
+ {
+ ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
+ try
+ {
+ // TODO JNR
+ }
+ finally
+ {
+ shutdownAll(timerService);
+ }
+ }
+
+ /** Task used to migrate excluded branch. */
+ private final class MigrateExcludedTask extends ImportTask
+ {
+ private MigrateExcludedTask(final Storage storage)
+ {
+ super(storage);
+ }
+
+ @Override
+ void call0(WriteableTransaction txn) throws Exception
+ {
+ for (Suffix suffix : dnSuffixMap.values())
+ {
+ EntryContainer entryContainer = suffix.getSrcEntryContainer();
+ if (entryContainer != null && !suffix.getExcludeBranches().isEmpty())
+ {
+ logger.info(NOTE_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN());
+ Cursor<ByteString, ByteString> cursor = txn.openCursor(entryContainer.getDN2ID().getName());
+ try
+ {
+ for (DN excludedDN : suffix.getExcludeBranches())
+ {
+ final ByteString key = dnToDNKey(excludedDN, suffix.getBaseDN().size());
+ boolean success = cursor.positionToKeyOrNext(key);
+ if (success && key.equals(cursor.getKey()))
+ {
+ /*
+ * This is the base entry for a branch that was excluded in the
+ * import so we must migrate all entries in this branch over to
+ * the new entry container.
+ */
+ ByteStringBuilder end = afterKey(key);
+
+ while (success
+ && ByteSequence.COMPARATOR.compare(key, end) < 0
+ && !importCfg.isCancelled()
+ && !isCanceled)
+ {
+ EntryID id = new EntryID(cursor.getValue());
+ Entry entry = entryContainer.getID2Entry().get(txn, id);
+ processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
+ migratedCount++;
+ success = cursor.next();
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+ }
+ }
+ }
+
+ /** Task to migrate existing entries. */
+ private final class MigrateExistingTask extends ImportTask
+ {
+ private MigrateExistingTask(final Storage storage)
+ {
+ super(storage);
+ }
+
+ @Override
+ void call0(WriteableTransaction txn) throws Exception
+ {
+ for (Suffix suffix : dnSuffixMap.values())
+ {
+ EntryContainer entryContainer = suffix.getSrcEntryContainer();
+ if (entryContainer != null && !suffix.getIncludeBranches().isEmpty())
+ {
+ logger.info(NOTE_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN());
+ Cursor<ByteString, ByteString> cursor = txn.openCursor(entryContainer.getDN2ID().getName());
+ try
+ {
+ final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
+ boolean success = cursor.next();
+ while (success
+ && !importCfg.isCancelled()
+ && !isCanceled)
+ {
+ final ByteString key = cursor.getKey();
+ if (!includeBranches.contains(key))
+ {
+ EntryID id = new EntryID(key);
+ Entry entry = entryContainer.getID2Entry().get(txn, id);
+ processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
+ migratedCount++;
+ success = cursor.next();
+ }
+ else
+ {
+ /*
+ * This is the base entry for a branch that will be included
+ * in the import so we do not want to copy the branch to the
+ * new entry container.
+ */
+ /*
+ * Advance the cursor to next entry at the same level in the DIT
+ * skipping all the entries in this branch.
+ */
+ ByteStringBuilder begin = afterKey(key);
+ success = cursor.positionToKeyOrNext(begin);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+ }
+ }
+
+ private List<ByteString> includeBranchesAsBytes(Suffix suffix)
+ {
+ List<ByteString> includeBranches = new ArrayList<>(suffix.getIncludeBranches().size());
+ for (DN includeBranch : suffix.getIncludeBranches())
+ {
+ if (includeBranch.isDescendantOf(suffix.getBaseDN()))
+ {
+ includeBranches.add(dnToDNKey(includeBranch, suffix.getBaseDN().size()));
+ }
+ }
+ return includeBranches;
+ }
+ }
+
+ /**
+ * This task performs phase reading and processing of the entries read from
+ * the LDIF file(s). This task is used if the append flag wasn't specified.
+ */
+ private class ImportTask implements Callable<Void>
+ {
+ private final Storage storage;
+
+ public ImportTask(final Storage storage)
+ {
+ this.storage = storage;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public final Void call() throws Exception
+ {
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ call0(txn);
+ }
+ });
+ return null;
+ }
+
+ void call0(WriteableTransaction txn) throws Exception
+ {
+ try
+ {
+ EntryInformation entryInfo;
+ while ((entryInfo = reader.readEntry(dnSuffixMap)) != null)
+ {
+ if (importCfg.isCancelled() || isCanceled)
+ {
+ return;
+ }
+ processEntry(txn, entryInfo.getEntry(), entryInfo.getEntryID(), entryInfo.getSuffix());
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_IMPORT_LDIF_IMPORT_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ }
+
+ void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix)
+ throws DirectoryException, StorageRuntimeException, InterruptedException
+ {
+ DN entryDN = entry.getName();
+ if (validateDNs && !dnSanityCheck(txn, entry, suffix))
+ {
+ suffix.removePending(entryDN);
+ return;
+ }
+ suffix.removePending(entryDN);
+ processDN2ID(suffix, entryDN, entryID);
+ processDN2URI(suffix, entry);
+ processIndexes(suffix, entry, entryID, false);
+ processVLVIndexes(txn, suffix, entry, entryID);
+ suffix.getID2Entry().put(txn, entryID, entry);
+ importCount.getAndIncrement();
+ }
+
+ /**
+ * Examine the DN for duplicates and missing parents.
+ *
+ * @return true if the import operation can proceed with the provided entry, false otherwise
+ */
+ @SuppressWarnings("javadoc")
+ boolean dnSanityCheck(WriteableTransaction txn, Entry entry, Suffix suffix)
+ throws StorageRuntimeException, InterruptedException
+ {
+ //Perform parent checking.
+ DN entryDN = entry.getName();
+ DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
+ if (parentDN != null && !suffix.isParentProcessed(txn, parentDN, dnCache, clearedBackend))
+ {
+ reader.rejectEntry(entry, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
+ return false;
+ }
+ if (!insert(txn, entryDN, suffix, dnCache))
+ {
+ reader.rejectEntry(entry, WARN_IMPORT_ENTRY_EXISTS.get());
+ return false;
+ }
+ return true;
+ }
+
+ private boolean insert(WriteableTransaction txn, DN entryDN, Suffix suffix, DNCache dnCache)
+ {
+ //If the backend was not cleared, then first check dn2id
+ //for DNs that might not exist in the DN cache.
+ if (!clearedBackend && suffix.getDN2ID().get(txn, entryDN) != null)
+ {
+ return false;
+ }
+ return dnCache.insert(entryDN);
+ }
+
+ void processIndexes(Suffix suffix, Entry entry, EntryID entryID, boolean allIndexes)
+ throws StorageRuntimeException, InterruptedException
+ {
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
+ {
+ AttributeType attrType = mapEntry.getKey();
+ AttributeIndex attrIndex = mapEntry.getValue();
+ if (allIndexes || entry.hasAttribute(attrType))
+ {
+ for (Map.Entry<String, MatchingRuleIndex> mapEntry2 : attrIndex.getNameToIndexes().entrySet())
+ {
+ String indexID = mapEntry2.getKey();
+ MatchingRuleIndex index = mapEntry2.getValue();
+
+ IndexKey indexKey = new IndexKey(attrType, indexID, index.getIndexEntryLimit());
+ processAttribute(index, entry, entryID, indexKey);
+ }
+ }
+ }
+ }
+
+ void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID)
+ throws DirectoryException
+ {
+ final EntryContainer entryContainer = suffix.getEntryContainer();
+ final IndexBuffer buffer = new IndexBuffer(entryContainer);
+ for (VLVIndex vlvIdx : entryContainer.getVLVIndexes())
+ {
+ vlvIdx.addEntry(buffer, entryID, entry);
+ }
+ buffer.flush(txn);
+ }
+
+ void processAttribute(MatchingRuleIndex index, Entry entry, EntryID entryID, IndexKey indexKey)
+ throws StorageRuntimeException, InterruptedException
+ {
+ for (ByteString key : index.indexEntry(entry))
+ {
+ processKey(index, key, entryID, indexKey);
+ }
+ }
+
+ final int processKey(Tree tree, ByteString key, EntryID entryID, IndexKey indexKey) throws InterruptedException
+ {
+ // TODO JNR implement
+ return -1;
+ }
+
+ void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
+ {
+ // TODO JNR implement
+ }
+
+ private void processDN2URI(Suffix suffix, Entry entry)
+ {
+ // TODO JNR implement
+ }
+ }
+
+ /** This class reports progress of first phase of import processing at fixed intervals. */
+ private final class FirstPhaseProgressTask extends TimerTask
+ {
+ /** The number of entries that had been read at the time of the previous progress report. */
+ private long previousCount;
+ /** The time in milliseconds of the previous progress report. */
+ private long previousTime;
+
+ /** Create a new import progress task. */
+ public FirstPhaseProgressTask()
+ {
+ previousTime = System.currentTimeMillis();
+ }
+
+ /** The action to be performed by this timer task. */
+ @Override
+ public void run()
+ {
+ long entriesRead = reader.getEntriesRead();
+ long entriesIgnored = reader.getEntriesIgnored();
+ long entriesRejected = reader.getEntriesRejected();
+ long deltaCount = entriesRead - previousCount;
+
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ if (deltaTime == 0)
+ {
+ return;
+ }
+ float rate = 1000f * deltaCount / deltaTime;
+ logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
+
+ previousCount = entriesRead;
+ previousTime = latestTime;
+ }
+ }
+
+ /** This class reports progress of the second phase of import processing at fixed intervals. */
+ private class SecondPhaseProgressTask extends TimerTask
+ {
+ /** The time in milliseconds of the previous progress report. */
+ private long previousTime;
+
+ /** Create a new import progress task. */
+ public SecondPhaseProgressTask()
+ {
+ previousTime = System.currentTimeMillis();
+ }
+
+ /** The action to be performed by this timer task. */
+ @Override
+ public void run()
+ {
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ if (deltaTime == 0)
+ {
+ return;
+ }
+
+ previousTime = latestTime;
+
+ // DN index managers first.
+ printStats(deltaTime, true);
+ // non-DN index managers second
+ printStats(deltaTime, false);
+ }
+
+ private void printStats(long deltaTime, boolean dn2id)
+ {
+ // TODO JNR
+ }
+ }
+
+ /** Invocation handler for the {@link PluggableBackendCfg} proxy. */
+ private static final class BackendCfgHandler implements InvocationHandler
+ {
+ private final Map<String, Object> returnValues;
+
+ private BackendCfgHandler(final Map<String, Object> returnValues)
+ {
+ this.returnValues = returnValues;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ final String methodName = method.getName();
+ if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
+ {
+ // ignore calls to (add|remove)*ChangeListener() methods
+ return null;
+ }
+
+ final Object returnValue = returnValues.get(methodName);
+ if (returnValue != null)
+ {
+ return returnValue;
+ }
+ throw new IllegalArgumentException("Unhandled method call on proxy ("
+ + BackendCfgHandler.class.getSimpleName()
+ + ") for method (" + method
+ + ") with arguments (" + Arrays.toString(args) + ")");
+ }
+ }
+
+ /**
+ * Used to check DN's when DN validation is performed during phase one processing.
+ * It is deleted after phase one processing.
+ */
+ private final class DNCacheImpl implements DNCache
+ {
+ private static final String DB_NAME = "dn_cache";
+ private final TreeName dnCache = new TreeName("", DB_NAME);
+ private final Storage storage;
+
+ private DNCacheImpl(File dnCachePath) throws StorageRuntimeException
+ {
+ final Map<String, Object> returnValues = new HashMap<>();
+ returnValues.put("getDBDirectory", dnCachePath.getAbsolutePath());
+ returnValues.put("getBackendId", DB_NAME);
+ returnValues.put("getDBCacheSize", 0L);
+ returnValues.put("getDBCachePercent", 10);
+ returnValues.put("isDBTxnNoSync", true);
+ returnValues.put("getDBDirectoryPermissions", "700");
+ returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
+ returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
+ try
+ {
+ returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importDNCache,cn=Backends,cn=config"));
+ storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
+ DirectoryServer.getInstance().getServerContext());
+ storage.open();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ txn.openTree(dnCache);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
+ {
+ return (PersistitBackendCfg) Proxy.newProxyInstance(
+ getClass().getClassLoader(),
+ new Class<?>[] { PersistitBackendCfg.class },
+ new BackendCfgHandler(returnValues));
+ }
+
+ private static final long FNV_INIT = 0xcbf29ce484222325L;
+ private static final long FNV_PRIME = 0x100000001b3L;
+
+ /** Hash the DN bytes. Uses the FNV-1a hash. */
+ private ByteString fnv1AHashCode(DN dn)
+ {
+ final ByteString b = dn.toNormalizedByteString();
+
+ long hash = FNV_INIT;
+ for (int i = 0; i < b.length(); i++)
+ {
+ hash ^= b.byteAt(i);
+ hash *= FNV_PRIME;
+ }
+ return ByteString.valueOf(hash);
+ }
+
+ @Override
+ public void close() throws StorageRuntimeException
+ {
+ try
+ {
+ storage.close();
+ }
+ finally
+ {
+ storage.removeStorageFiles();
+ }
+ }
+
+ @Override
+ public boolean insert(DN dn) throws StorageRuntimeException
+ {
+ // Use a compact representation for key
+ // and a reversible representation for value
+ final ByteString key = fnv1AHashCode(dn);
+ final ByteString dnValue = ByteString.valueOf(dn);
+
+ return insert(key, dnValue);
+ }
+
+ private boolean insert(final ByteString key, final ByteString dn) throws StorageRuntimeException
+ {
+ final AtomicBoolean updateResult = new AtomicBoolean();
+ try
+ {
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ updateResult.set(txn.update(dnCache, key, new UpdateFunction()
+ {
+ @Override
+ public ByteSequence computeNewValue(ByteSequence existingDns)
+ {
+ if (containsDN(existingDns, dn))
+ {
+ // no change
+ return existingDns;
+ }
+ else if (existingDns != null)
+ {
+ return addDN(existingDns, dn);
+ }
+ else
+ {
+ return singletonList(dn);
+ }
+ }
+
+ /** Add the DN to the DNs because of a hash collision. */
+ private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
+ {
+ final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
+ builder.append(dnList);
+ builder.append(dntoAdd.length());
+ builder.append(dntoAdd);
+ return builder;
+ }
+
+ /** Create a list of dn made of one element. */
+ private ByteSequence singletonList(final ByteSequence dntoAdd)
+ {
+ final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
+ singleton.append(dntoAdd.length());
+ singleton.append(dntoAdd);
+ return singleton;
+ }
+ }));
+ }
+ });
+ return updateResult.get();
+ }
+ catch (StorageRuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
+ private boolean containsDN(ByteSequence existingDns, ByteString dnToFind)
+ {
+ if (existingDns != null && existingDns.length() > 0)
+ {
+ final ByteSequenceReader reader = existingDns.asReader();
+ int pos = 0;
+ while (reader.remaining() != 0)
+ {
+ int dnLength = reader.getInt();
+ int dnStart = pos + INT_SIZE;
+ ByteSequence existingDn = existingDns.subSequence(dnStart, dnStart + dnLength);
+ if (dnToFind.equals(existingDn))
+ {
+ return true;
+ }
+ reader.skip(dnLength);
+ pos = reader.position();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean contains(final DN dn)
+ {
+ try
+ {
+ return storage.read(new ReadOperation<Boolean>()
+ {
+ @Override
+ public Boolean run(ReadableTransaction txn) throws Exception
+ {
+ final ByteString key = fnv1AHashCode(dn);
+ final ByteString existingDns = txn.read(dnCache, key);
+
+ return containsDN(existingDns, ByteString.valueOf(dn));
+ }
+ });
+ }
+ catch (StorageRuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+ }
+}
--
Gitblit v1.10.0