/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2015 ForgeRock AS */ package org.opends.server.backends.pluggable; import static org.opends.messages.BackendMessages.*; import static org.opends.server.backends.pluggable.DnKeyFormat.*; import static org.opends.server.core.DirectoryServer.*; import static org.opends.server.util.DynamicConstants.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteSequenceReader; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ByteStringBuilder; import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType; import org.opends.server.admin.std.server.BackendIndexCfg; import org.opends.server.admin.std.server.PersistitBackendCfg; import org.opends.server.admin.std.server.PluggableBackendCfg; import org.opends.server.backends.persistit.PersistItStorage; import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex; import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation; import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.DNCache; import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.IndexKey; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.ReadOperation; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.Storage; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.backends.pluggable.spi.TreeName; import org.opends.server.backends.pluggable.spi.UpdateFunction; import org.opends.server.backends.pluggable.spi.WriteOperation; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ServerContext; import org.opends.server.types.AttributeType; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.util.Platform; /** * This class provides the engine that performs both importing of LDIF files and * the rebuilding of indexes. */ final class OnDiskMergeStorageImporter { /** * Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings. */ @SuppressWarnings("javadoc") static final class StrategyImpl implements ImportStrategy { private final PluggableBackendCfg backendCfg; StrategyImpl(PluggableBackendCfg backendCfg) { this.backendCfg = backendCfg; } @Override public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext) throws DirectoryException, InitializationException { try { return new OnDiskMergeStorageImporter(rootContainer, importConfig, backendCfg, serverContext).processImport(); } catch (DirectoryException | InitializationException e) { logger.traceException(e); throw e; } catch (ConfigException e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e); } catch (Exception e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(stackTraceToSingleLineString(e)), e); } } } private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private static final int TIMER_INTERVAL = 10000; private static final String DEFAULT_TMP_DIR = "import-tmp"; private static final String DN_CACHE_DIR = "dn-cache"; /** Defaults for DB cache. */ private static final int MAX_DB_CACHE_SIZE = 8 * MB; private static final int MAX_DB_LOG_SIZE = 10 * MB; private static final int MIN_DB_CACHE_SIZE = 4 * MB; /** * Defaults for LDIF reader buffers, min memory required to import and default * size for byte buffers. */ private static final int READER_WRITER_BUFFER_SIZE = 8 * KB; private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE + MAX_DB_LOG_SIZE; /** Max size of phase one buffer. */ private static final int MAX_BUFFER_SIZE = 2 * MB; /** Min size of phase one buffer. */ private static final int MIN_BUFFER_SIZE = 4 * KB; /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */ private static final int SMALL_HEAP_SIZE = 256 * MB; /** Root container. */ private final RootContainer rootContainer; /** Import configuration. */ private final LDIFImportConfig importCfg; private final ServerContext serverContext; /** LDIF reader. */ private ImportLDIFReader reader; /** Phase one imported entries count. */ private final AtomicLong importCount = new AtomicLong(0); /** Migrated entry count. */ private int migratedCount; /** Phase one buffer size in bytes. */ private int bufferSize; /** Index count. */ private final int indexCount; /** Thread count. */ private int threadCount; /** Whether DN validation should be performed. If true, then it is performed during phase one. */ private final boolean validateDNs; /** Temp scratch directory. */ private final File tempDir; /** DN cache used when DN validation is done in first phase. */ private final DNCache dnCache; /** Size in bytes of DN cache. */ private long dnCacheSize; /** Available memory at the start of the import. */ private long availableMemory; /** Size in bytes of DB cache. */ private long dbCacheSize; /** Map of DNs to Suffix objects. */ private final Map dnSuffixMap = new LinkedHashMap<>(); /** Set to true if the backend was cleared. */ private final boolean clearedBackend; /** Used to shutdown import if an error occurs in phase one. */ private volatile boolean isCanceled; /** Number of phase one buffers. */ private int phaseOneBufferCount; private OnDiskMergeStorageImporter(RootContainer rootContainer, LDIFImportConfig importCfg, PluggableBackendCfg backendCfg, ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException { this.rootContainer = rootContainer; this.importCfg = importCfg; this.serverContext = serverContext; if (importCfg.getThreadCount() == 0) { this.threadCount = Runtime.getRuntime().availableProcessors() * 2; } else { this.threadCount = importCfg.getThreadCount(); } // Determine the number of indexes. this.indexCount = getTotalIndexCount(backendCfg); this.clearedBackend = mustClearBackend(importCfg, backendCfg); validateDNs = !importCfg.getSkipDNValidation(); this.tempDir = prepareTempDir(backendCfg, importCfg.getTmpDirectory()); // be careful: requires that a few data has been set computeMemoryRequirements(); if (validateDNs) { final File dnCachePath = new File(tempDir, DN_CACHE_DIR); dnCachePath.mkdirs(); this.dnCache = new DNCacheImpl(dnCachePath); } else { this.dnCache = null; } } private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException { File parentDir = getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR); File tempDir = new File(parentDir, backendCfg.getBackendId()); recursiveDelete(tempDir); if (!tempDir.exists() && !tempDir.mkdirs()) { throw new InitializationException(ERR_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir)); } return tempDir; } /** * Returns whether the backend must be cleared. * * @param importCfg * the import configuration object * @param backendCfg * the backend configuration object * @return true if the backend must be cleared, false otherwise * @see #prepareSuffix(WriteableTransaction, EntryContainer) for per-suffix cleanups. */ private static boolean mustClearBackend(LDIFImportConfig importCfg, PluggableBackendCfg backendCfg) { return !importCfg.appendToExistingData() && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1); /* * Why do we clear when there is only one baseDN? * any baseDN for which data is imported will be cleared anyway (see getSuffix()), * so if there is only one baseDN for this backend, then clear it now. */ } private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException { int indexes = 2; // dn2id, dn2uri for (String indexName : backendCfg.listBackendIndexes()) { BackendIndexCfg index = backendCfg.getBackendIndex(indexName); SortedSet types = index.getIndexType(); if (types.contains(IndexType.EXTENSIBLE)) { indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size(); } else { indexes += types.size(); } } return indexes; } /** * Calculate buffer sizes and initialize properties based on memory. * * @throws InitializationException * If a problem occurs during calculation. */ private void computeMemoryRequirements() throws InitializationException { // Calculate amount of usable memory. This will need to take into account // various fudge factors, including the number of IO buffers used by the // scratch writers (1 per index). calculateAvailableMemory(); final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE); // We need caching when doing DN validation if (validateDNs) { // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers. if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) { dbCacheSize = 500 * KB; dnCacheSize = 500 * KB; } else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE)) { dbCacheSize = MIN_DB_CACHE_SIZE; dnCacheSize = MIN_DB_CACHE_SIZE; } else if (!clearedBackend) { // Appending to existing data so reserve extra memory for the DB cache // since it will be needed for dn2id queries. dbCacheSize = usableMemory * 33 / 100; dnCacheSize = usableMemory * 33 / 100; } else { dbCacheSize = MAX_DB_CACHE_SIZE; dnCacheSize = usableMemory * 66 / 100; } } else { // No DN validation: calculate memory for DB cache and buffers. // No need for DN2ID cache. dnCacheSize = 0; if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) { dbCacheSize = 500 * KB; } else if (usableMemory < MIN_DB_CACHE_MEMORY) { dbCacheSize = MIN_DB_CACHE_SIZE; } else { // No need to differentiate between append/clear backend, since dn2id is // not being queried. dbCacheSize = MAX_DB_CACHE_SIZE; } } final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize; final int oldThreadCount = threadCount; if (indexCount != 0) // Avoid / by zero { while (true) { phaseOneBufferCount = 2 * indexCount * threadCount; // Scratch writers allocate 4 buffers per index as well. final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount); long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount; // We need (2 * bufferSize) to fit in an int for the insertByteStream // and deleteByteStream constructors. bufferSize = (int) Math.min(longBufferSize, Integer.MAX_VALUE / 2); if (bufferSize > MAX_BUFFER_SIZE) { if (validateDNs) { // The buffers are big enough: the memory is best used for the DN2ID temp DB bufferSize = MAX_BUFFER_SIZE; final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize); if (!clearedBackend) { dbCacheSize += extraMemory / 2; dnCacheSize += extraMemory / 2; } else { dnCacheSize += extraMemory; } } break; } else if (bufferSize > MIN_BUFFER_SIZE) { // This is acceptable. break; } else if (threadCount > 1) { // Retry using less threads. threadCount--; } else { // Not enough memory. final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE; throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get( usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize)); } } } if (oldThreadCount != threadCount) { logger.info(NOTE_IMPORT_ADJUST_THREAD_COUNT, oldThreadCount, threadCount); } logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount); if (dnCacheSize > 0) { logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize); } logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize); } /** * Calculates the amount of available memory which can be used by this import, * taking into account whether or not the import is running offline or online * as a task. */ private void calculateAvailableMemory() { final long totalAvailableMemory; if (DirectoryServer.isRunning()) { // Online import/rebuild. final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory(); totalAvailableMemory = Math.max(availableMemory, 16 * MB); } else { // Offline import/rebuild. totalAvailableMemory = Platform.getUsableMemoryForCaching(); } // Now take into account various fudge factors. int importMemPct = 90; if (totalAvailableMemory <= SMALL_HEAP_SIZE) { // Be pessimistic when memory is low. importMemPct -= 25; } availableMemory = totalAvailableMemory * importMemPct / 100; } private void initializeSuffixes(WriteableTransaction txn) throws ConfigException, DirectoryException { for (EntryContainer ec : rootContainer.getEntryContainers()) { Suffix suffix = getSuffix(txn, ec); if (suffix != null) { dnSuffixMap.put(ec.getBaseDN(), suffix); } } } private Suffix getSuffix(WriteableTransaction txn, EntryContainer entryContainer) throws ConfigException, DirectoryException { if (importCfg.appendToExistingData() || importCfg.clearBackend()) { return new Suffix(entryContainer); } final DN baseDN = entryContainer.getBaseDN(); if (importCfg.getExcludeBranches().contains(baseDN)) { // This entire base DN was explicitly excluded. Skip. return null; } EntryContainer sourceEntryContainer = null; List excludeBranches = getDescendants(baseDN, importCfg.getExcludeBranches()); List includeBranches = null; if (!importCfg.getIncludeBranches().isEmpty()) { includeBranches = getDescendants(baseDN, importCfg.getIncludeBranches()); if (includeBranches.isEmpty()) { // There are no branches in the explicitly defined include list under this base DN. // Skip this base DN altogether. return null; } // Remove any overlapping include branches. Iterator includeBranchIterator = includeBranches.iterator(); while (includeBranchIterator.hasNext()) { DN includeDN = includeBranchIterator.next(); if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN)) { includeBranchIterator.remove(); } } // Remove any exclude branches that are not are not under a include branch // since they will be migrated as part of the existing entries // outside of the include branches anyways. Iterator excludeBranchIterator = excludeBranches.iterator(); while (excludeBranchIterator.hasNext()) { DN excludeDN = excludeBranchIterator.next(); if (!isAnyAncestorOf(includeBranches, excludeDN)) { excludeBranchIterator.remove(); } } if (excludeBranches.isEmpty() && includeBranches.size() == 1 && includeBranches.get(0).equals(baseDN)) { // This entire base DN is explicitly included in the import with // no exclude branches that we need to migrate. // Just clear the entry container. clearSuffix(entryContainer); } else { sourceEntryContainer = entryContainer; // Create a temp entry container DN tempDN = baseDN.child(DN.valueOf("dc=importTmp")); entryContainer = rootContainer.openEntryContainer(tempDN, txn); } } return new Suffix(entryContainer, sourceEntryContainer, includeBranches, excludeBranches); } private List getDescendants(DN baseDN, Set dns) { final List results = new ArrayList<>(); for (DN dn : dns) { if (baseDN.isAncestorOf(dn)) { results.add(dn); } } return results; } private static void clearSuffix(EntryContainer entryContainer) { entryContainer.lock(); entryContainer.clear(); entryContainer.unlock(); } private static boolean isAnyNotEqualAndAncestorOf(List dns, DN childDN) { for (DN dn : dns) { if (!dn.equals(childDN) && dn.isAncestorOf(childDN)) { return false; } } return true; } private static boolean isAnyAncestorOf(List dns, DN childDN) { for (DN dn : dns) { if (dn.isAncestorOf(childDN)) { return true; } } return false; } private LDIFImportResult processImport() throws Exception { try { try { reader = new ImportLDIFReader(importCfg, rootContainer); } catch (IOException ioe) { throw new InitializationException(ERR_IMPORT_LDIF_READER_IO_ERROR.get(), ioe); } logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER); logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount); final Storage storage = rootContainer.getStorage(); storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { initializeSuffixes(txn); setIndexesTrusted(txn, false); } }); final long startTime = System.currentTimeMillis(); importPhaseOne(); final long phaseOneFinishTime = System.currentTimeMillis(); if (validateDNs) { dnCache.close(); } if (isCanceled) { throw new InterruptedException("Import processing canceled."); } final long phaseTwoTime = System.currentTimeMillis(); importPhaseTwo(); if (isCanceled) { throw new InterruptedException("Import processing canceled."); } final long phaseTwoFinishTime = System.currentTimeMillis(); storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { setIndexesTrusted(txn, true); switchEntryContainers(txn); } }); recursiveDelete(tempDir); final long finishTime = System.currentTimeMillis(); final long importTime = finishTime - startTime; logger.info(NOTE_IMPORT_PHASE_STATS, importTime / 1000, (phaseOneFinishTime - startTime) / 1000, (phaseTwoFinishTime - phaseTwoTime) / 1000); float rate = 0; if (importTime > 0) { rate = 1000f * reader.getEntriesRead() / importTime; } logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(), reader.getEntriesIgnored(), reader.getEntriesRejected(), migratedCount, importTime / 1000, rate); return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); } finally { close(reader); if (validateDNs) { close(dnCache); } } } private void switchEntryContainers(WriteableTransaction txn) throws StorageRuntimeException, InitializationException { for (Suffix suffix : dnSuffixMap.values()) { DN baseDN = suffix.getBaseDN(); EntryContainer entryContainer = suffix.getSrcEntryContainer(); if (entryContainer != null) { final EntryContainer toDelete = rootContainer.unregisterEntryContainer(baseDN); toDelete.lock(); toDelete.close(); toDelete.delete(txn); toDelete.unlock(); final EntryContainer replacement = suffix.getEntryContainer(); replacement.lock(); replacement.setTreePrefix(baseDN.toNormalizedUrlSafeString()); replacement.unlock(); rootContainer.registerEntryContainer(baseDN, replacement); } } } private void setIndexesTrusted(WriteableTransaction txn, boolean trusted) throws StorageRuntimeException { try { for (Suffix s : dnSuffixMap.values()) { s.setIndexesTrusted(txn, trusted); } } catch (StorageRuntimeException ex) { throw new StorageRuntimeException(NOTE_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()).toString()); } } /** * Reads all entries from id2entry, and: *
    *
  1. compute how the entry is indexed for each index
  2. *
  3. store the result of indexing entries into in-memory index buffers
  4. *
  5. each time an in-memory index buffer is filled, sort it and write it to scratch files. * The scratch files will be read by phaseTwo to perform on-disk merge
  6. *
*/ private void importPhaseOne() throws Exception { final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); scheduleAtFixedRate(timerService, new FirstPhaseProgressTask()); final ExecutorService execService = Executors.newFixedThreadPool(threadCount); final Storage storage = rootContainer.getStorage(); execService.submit(new MigrateExistingTask(storage)).get(); final List> tasks = new ArrayList<>(threadCount); if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries()) { for (int i = 0; i < threadCount; i++) { tasks.add(new ImportTask(storage)); } } execService.invokeAll(tasks); tasks.clear(); execService.submit(new MigrateExcludedTask(storage)).get(); shutdownAll(timerService, execService); } private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task) { timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS); } private static void shutdownAll(ExecutorService... executorServices) throws InterruptedException { for (ExecutorService executorService : executorServices) { executorService.shutdown(); } for (ExecutorService executorService : executorServices) { executorService.awaitTermination(30, TimeUnit.SECONDS); } } private void importPhaseTwo() throws Exception { ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); scheduleAtFixedRate(timerService, new SecondPhaseProgressTask()); try { // TODO JNR } finally { shutdownAll(timerService); } } /** Task used to migrate excluded branch. */ private final class MigrateExcludedTask extends ImportTask { private MigrateExcludedTask(final Storage storage) { super(storage); } @Override void call0(WriteableTransaction txn) throws Exception { for (Suffix suffix : dnSuffixMap.values()) { EntryContainer entryContainer = suffix.getSrcEntryContainer(); if (entryContainer != null && !suffix.getExcludeBranches().isEmpty()) { logger.info(NOTE_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN()); Cursor cursor = txn.openCursor(entryContainer.getDN2ID().getName()); try { for (DN excludedDN : suffix.getExcludeBranches()) { final ByteString key = dnToDNKey(excludedDN, suffix.getBaseDN().size()); boolean success = cursor.positionToKeyOrNext(key); if (success && key.equals(cursor.getKey())) { /* * This is the base entry for a branch that was excluded in the * import so we must migrate all entries in this branch over to * the new entry container. */ ByteStringBuilder end = afterKey(key); while (success && ByteSequence.COMPARATOR.compare(key, end) < 0 && !importCfg.isCancelled() && !isCanceled) { EntryID id = new EntryID(cursor.getValue()); Entry entry = entryContainer.getID2Entry().get(txn, id); processEntry(txn, entry, rootContainer.getNextEntryID(), suffix); migratedCount++; success = cursor.next(); } } } } catch (Exception e) { logger.error(ERR_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage()); isCanceled = true; throw e; } finally { close(cursor); } } } } } /** Task to migrate existing entries. */ private final class MigrateExistingTask extends ImportTask { private MigrateExistingTask(final Storage storage) { super(storage); } @Override void call0(WriteableTransaction txn) throws Exception { for (Suffix suffix : dnSuffixMap.values()) { EntryContainer entryContainer = suffix.getSrcEntryContainer(); if (entryContainer != null && !suffix.getIncludeBranches().isEmpty()) { logger.info(NOTE_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN()); Cursor cursor = txn.openCursor(entryContainer.getDN2ID().getName()); try { final List includeBranches = includeBranchesAsBytes(suffix); boolean success = cursor.next(); while (success && !importCfg.isCancelled() && !isCanceled) { final ByteString key = cursor.getKey(); if (!includeBranches.contains(key)) { EntryID id = new EntryID(key); Entry entry = entryContainer.getID2Entry().get(txn, id); processEntry(txn, entry, rootContainer.getNextEntryID(), suffix); migratedCount++; success = cursor.next(); } else { /* * This is the base entry for a branch that will be included * in the import so we do not want to copy the branch to the * new entry container. */ /* * Advance the cursor to next entry at the same level in the DIT * skipping all the entries in this branch. */ ByteStringBuilder begin = afterKey(key); success = cursor.positionToKeyOrNext(begin); } } } catch (Exception e) { logger.error(ERR_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage()); isCanceled = true; throw e; } finally { close(cursor); } } } } private List includeBranchesAsBytes(Suffix suffix) { List includeBranches = new ArrayList<>(suffix.getIncludeBranches().size()); for (DN includeBranch : suffix.getIncludeBranches()) { if (includeBranch.isDescendantOf(suffix.getBaseDN())) { includeBranches.add(dnToDNKey(includeBranch, suffix.getBaseDN().size())); } } return includeBranches; } } /** * This task performs phase reading and processing of the entries read from * the LDIF file(s). This task is used if the append flag wasn't specified. */ private class ImportTask implements Callable { private final Storage storage; public ImportTask(final Storage storage) { this.storage = storage; } /** {@inheritDoc} */ @Override public final Void call() throws Exception { storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { call0(txn); } }); return null; } void call0(WriteableTransaction txn) throws Exception { try { EntryInformation entryInfo; while ((entryInfo = reader.readEntry(dnSuffixMap)) != null) { if (importCfg.isCancelled() || isCanceled) { return; } processEntry(txn, entryInfo.getEntry(), entryInfo.getEntryID(), entryInfo.getSuffix()); } } catch (Exception e) { logger.error(ERR_IMPORT_LDIF_IMPORT_TASK_ERR, e.getMessage()); isCanceled = true; throw e; } } void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix) throws DirectoryException, StorageRuntimeException, InterruptedException { DN entryDN = entry.getName(); if (validateDNs && !dnSanityCheck(txn, entry, suffix)) { suffix.removePending(entryDN); return; } suffix.removePending(entryDN); processDN2ID(suffix, entryDN, entryID); processDN2URI(suffix, entry); processIndexes(suffix, entry, entryID, false); processVLVIndexes(txn, suffix, entry, entryID); suffix.getID2Entry().put(txn, entryID, entry); importCount.getAndIncrement(); } /** * Examine the DN for duplicates and missing parents. * * @return true if the import operation can proceed with the provided entry, false otherwise */ @SuppressWarnings("javadoc") boolean dnSanityCheck(WriteableTransaction txn, Entry entry, Suffix suffix) throws StorageRuntimeException, InterruptedException { //Perform parent checking. DN entryDN = entry.getName(); DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN); if (parentDN != null && !suffix.isParentProcessed(txn, parentDN, dnCache, clearedBackend)) { reader.rejectEntry(entry, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN)); return false; } if (!insert(txn, entryDN, suffix, dnCache)) { reader.rejectEntry(entry, WARN_IMPORT_ENTRY_EXISTS.get()); return false; } return true; } private boolean insert(WriteableTransaction txn, DN entryDN, Suffix suffix, DNCache dnCache) { //If the backend was not cleared, then first check dn2id //for DNs that might not exist in the DN cache. if (!clearedBackend && suffix.getDN2ID().get(txn, entryDN) != null) { return false; } return dnCache.insert(entryDN); } void processIndexes(Suffix suffix, Entry entry, EntryID entryID, boolean allIndexes) throws StorageRuntimeException, InterruptedException { for (Map.Entry mapEntry : suffix.getAttrIndexMap().entrySet()) { AttributeType attrType = mapEntry.getKey(); AttributeIndex attrIndex = mapEntry.getValue(); if (allIndexes || entry.hasAttribute(attrType)) { for (Map.Entry mapEntry2 : attrIndex.getNameToIndexes().entrySet()) { String indexID = mapEntry2.getKey(); MatchingRuleIndex index = mapEntry2.getValue(); IndexKey indexKey = new IndexKey(attrType, indexID, index.getIndexEntryLimit()); processAttribute(index, entry, entryID, indexKey); } } } } void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException { final EntryContainer entryContainer = suffix.getEntryContainer(); final IndexBuffer buffer = new IndexBuffer(entryContainer); for (VLVIndex vlvIdx : entryContainer.getVLVIndexes()) { vlvIdx.addEntry(buffer, entryID, entry); } buffer.flush(txn); } void processAttribute(MatchingRuleIndex index, Entry entry, EntryID entryID, IndexKey indexKey) throws StorageRuntimeException, InterruptedException { for (ByteString key : index.indexEntry(entry)) { processKey(index, key, entryID, indexKey); } } final int processKey(Tree tree, ByteString key, EntryID entryID, IndexKey indexKey) throws InterruptedException { // TODO JNR implement return -1; } void processDN2ID(Suffix suffix, DN dn, EntryID entryID) { // TODO JNR implement } private void processDN2URI(Suffix suffix, Entry entry) { // TODO JNR implement } } /** This class reports progress of first phase of import processing at fixed intervals. */ private final class FirstPhaseProgressTask extends TimerTask { /** The number of entries that had been read at the time of the previous progress report. */ private long previousCount; /** The time in milliseconds of the previous progress report. */ private long previousTime; /** Create a new import progress task. */ public FirstPhaseProgressTask() { previousTime = System.currentTimeMillis(); } /** The action to be performed by this timer task. */ @Override public void run() { long entriesRead = reader.getEntriesRead(); long entriesIgnored = reader.getEntriesIgnored(); long entriesRejected = reader.getEntriesRejected(); long deltaCount = entriesRead - previousCount; long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } float rate = 1000f * deltaCount / deltaTime; logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate); previousCount = entriesRead; previousTime = latestTime; } } /** This class reports progress of the second phase of import processing at fixed intervals. */ private class SecondPhaseProgressTask extends TimerTask { /** The time in milliseconds of the previous progress report. */ private long previousTime; /** Create a new import progress task. */ public SecondPhaseProgressTask() { previousTime = System.currentTimeMillis(); } /** The action to be performed by this timer task. */ @Override public void run() { long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } previousTime = latestTime; // DN index managers first. printStats(deltaTime, true); // non-DN index managers second printStats(deltaTime, false); } private void printStats(long deltaTime, boolean dn2id) { // TODO JNR } } /** Invocation handler for the {@link PluggableBackendCfg} proxy. */ private static final class BackendCfgHandler implements InvocationHandler { private final Map returnValues; private BackendCfgHandler(final Map returnValues) { this.returnValues = returnValues; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final String methodName = method.getName(); if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener")) { // ignore calls to (add|remove)*ChangeListener() methods return null; } final Object returnValue = returnValues.get(methodName); if (returnValue != null) { return returnValue; } throw new IllegalArgumentException("Unhandled method call on proxy (" + BackendCfgHandler.class.getSimpleName() + ") for method (" + method + ") with arguments (" + Arrays.toString(args) + ")"); } } /** * Used to check DN's when DN validation is performed during phase one processing. * It is deleted after phase one processing. */ private final class DNCacheImpl implements DNCache { private static final String DB_NAME = "dn_cache"; private final TreeName dnCache = new TreeName("", DB_NAME); private final Storage storage; private DNCacheImpl(File dnCachePath) throws StorageRuntimeException { final Map returnValues = new HashMap<>(); returnValues.put("getDBDirectory", dnCachePath.getAbsolutePath()); returnValues.put("getBackendId", DB_NAME); returnValues.put("getDBCacheSize", 0L); returnValues.put("getDBCachePercent", 10); returnValues.put("isDBTxnNoSync", true); returnValues.put("getDBDirectoryPermissions", "700"); returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB)); returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB)); try { returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importDNCache,cn=Backends,cn=config")); storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues), DirectoryServer.getInstance().getServerContext()); storage.open(); storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { txn.openTree(dnCache); } }); } catch (Exception e) { throw new StorageRuntimeException(e); } } private PersistitBackendCfg newPersistitBackendCfgProxy(Map returnValues) { return (PersistitBackendCfg) Proxy.newProxyInstance( getClass().getClassLoader(), new Class[] { PersistitBackendCfg.class }, new BackendCfgHandler(returnValues)); } private static final long FNV_INIT = 0xcbf29ce484222325L; private static final long FNV_PRIME = 0x100000001b3L; /** Hash the DN bytes. Uses the FNV-1a hash. */ private ByteString fnv1AHashCode(DN dn) { final ByteString b = dn.toNormalizedByteString(); long hash = FNV_INIT; for (int i = 0; i < b.length(); i++) { hash ^= b.byteAt(i); hash *= FNV_PRIME; } return ByteString.valueOf(hash); } @Override public void close() throws StorageRuntimeException { try { storage.close(); } finally { storage.removeStorageFiles(); } } @Override public boolean insert(DN dn) throws StorageRuntimeException { // Use a compact representation for key // and a reversible representation for value final ByteString key = fnv1AHashCode(dn); final ByteString dnValue = ByteString.valueOf(dn); return insert(key, dnValue); } private boolean insert(final ByteString key, final ByteString dn) throws StorageRuntimeException { final AtomicBoolean updateResult = new AtomicBoolean(); try { storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { updateResult.set(txn.update(dnCache, key, new UpdateFunction() { @Override public ByteSequence computeNewValue(ByteSequence existingDns) { if (containsDN(existingDns, dn)) { // no change return existingDns; } else if (existingDns != null) { return addDN(existingDns, dn); } else { return singletonList(dn); } } /** Add the DN to the DNs because of a hash collision. */ private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd) { final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length()); builder.append(dnList); builder.append(dntoAdd.length()); builder.append(dntoAdd); return builder; } /** Create a list of dn made of one element. */ private ByteSequence singletonList(final ByteSequence dntoAdd) { final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE); singleton.append(dntoAdd.length()); singleton.append(dntoAdd); return singleton; } })); } }); return updateResult.get(); } catch (StorageRuntimeException e) { throw e; } catch (Exception e) { throw new StorageRuntimeException(e); } } /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */ private boolean containsDN(ByteSequence existingDns, ByteString dnToFind) { if (existingDns != null && existingDns.length() > 0) { final ByteSequenceReader reader = existingDns.asReader(); int pos = 0; while (reader.remaining() != 0) { int dnLength = reader.getInt(); int dnStart = pos + INT_SIZE; ByteSequence existingDn = existingDns.subSequence(dnStart, dnStart + dnLength); if (dnToFind.equals(existingDn)) { return true; } reader.skip(dnLength); pos = reader.position(); } } return false; } @Override public boolean contains(final DN dn) { try { return storage.read(new ReadOperation() { @Override public Boolean run(ReadableTransaction txn) throws Exception { final ByteString key = fnv1AHashCode(dn); final ByteString existingDns = txn.read(dnCache, key); return containsDN(existingDns, ByteString.valueOf(dn)); } }); } catch (StorageRuntimeException e) { throw e; } catch (Exception e) { throw new StorageRuntimeException(e); } } } }