| | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.forgerock.opendj.ldap.ResultCode.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.forgerock.opendj.ldap.spi.Indexer; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.forgerock.util.promise.PromiseImpl; |
| | | import org.forgerock.opendj.server.config.meta.BackendIndexCfgDefn.IndexType; |
| | | import org.forgerock.opendj.server.config.server.BackendIndexCfg; |
| | | import org.forgerock.opendj.server.config.server.PluggableBackendCfg; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.forgerock.util.promise.PromiseImpl; |
| | | import org.opends.server.api.CompressedSchema; |
| | | import org.opends.server.backends.RebuildConfig; |
| | | import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex; |
| | |
| | | } |
| | | |
| | | @Override |
| | | public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception |
| | | public LDIFImportResult importLDIF(LDIFImportConfig importConfig) |
| | | throws InitializationException, ConfigException, InterruptedException, ExecutionException |
| | | { |
| | | logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION); |
| | | |
| | |
| | | final OnDiskMergeImporter importer; |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | final LDIFReaderSource source = |
| | | new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount); |
| | | final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory()); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport()) |
| | | try (final LDIFReaderSource source = |
| | | new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount)) |
| | | { |
| | | final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers(); |
| | | final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation() |
| | | ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter) |
| | | : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter); |
| | | |
| | | importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy); |
| | | importer.doImport(source); |
| | | } |
| | | finally |
| | | { |
| | | sorter.shutdownNow(); |
| | | if (OperatingSystem.isWindows()) |
| | | final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory()); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport()) |
| | | { |
| | | // Try to force the JVM to close mmap()ed file so that they can be deleted. |
| | | // (see http://bugs.java.com/view_bug.do?bug_id=4715154) |
| | | System.gc(); |
| | | Runtime.getRuntime().runFinalization(); |
| | | final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers(); |
| | | final AbstractTwoPhaseImportStrategy importStrategy = |
| | | new ExternalSortAndImportStrategy(entryContainers, dbStorage, tempDir, bufferPool, sorter); |
| | | importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy); |
| | | importer.doImport(source); |
| | | } |
| | | recursiveDelete(tempDir); |
| | | } |
| | | logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis() |
| | | / 1000, importer.getPhaseTwoTimeInMillis() / 1000); |
| | | finally |
| | | { |
| | | sorter.shutdownNow(); |
| | | if (OperatingSystem.isWindows()) |
| | | { |
| | | // Try to force the JVM to close mmap()ed file so that they can be deleted. |
| | | // (see http://bugs.java.com/view_bug.do?bug_id=4715154) |
| | | System.gc(); |
| | | Runtime.getRuntime().runFinalization(); |
| | | } |
| | | recursiveDelete(tempDir); |
| | | } |
| | | logger.info(NOTE_IMPORT_PHASE_STATS, |
| | | importer.getTotalTimeInMillis() / 1000, |
| | | importer.getPhaseOneTimeInMillis() / 1000, |
| | | importer.getPhaseTwoTimeInMillis() / 1000); |
| | | |
| | | final long importTime = System.currentTimeMillis() - startTime; |
| | | float rate = 0; |
| | | if (importTime > 0) |
| | | { |
| | | rate = 1000f * source.getEntriesRead() / importTime; |
| | | final long importTime = System.currentTimeMillis() - startTime; |
| | | float rate = 0; |
| | | if (importTime > 0) |
| | | { |
| | | rate = 1000f * source.getEntriesRead() / importTime; |
| | | } |
| | | logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source |
| | | .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate); |
| | | return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source.getEntriesIgnored()); |
| | | } |
| | | logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source |
| | | .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate); |
| | | |
| | | return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source |
| | | .getEntriesIgnored()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ExecutionException(e); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void rebuildIndex(final RebuildConfig rebuildConfig) throws Exception |
| | | public void rebuildIndex(final RebuildConfig rebuildConfig) |
| | | throws InitializationException, ExecutionException, ConfigException, InterruptedException |
| | | { |
| | | final EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN()); |
| | | final long totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>() |
| | | final long totalEntries; |
| | | try |
| | | { |
| | | @Override |
| | | public Long run(ReadableTransaction txn) throws Exception |
| | | totalEntries = rootContainer.getStorage().read(new ReadOperation<Long>() |
| | | { |
| | | return entryContainer.getID2Entry().getRecordCount(txn); |
| | | } |
| | | }); |
| | | @Override |
| | | public Long run(ReadableTransaction txn) throws Exception |
| | | { |
| | | return entryContainer.getID2Entry().getRecordCount(txn); |
| | | } |
| | | }); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ExecutionException(e); |
| | | } |
| | | |
| | | final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries); |
| | | if (rebuildConfig.isClearDegradedState()) |
| | |
| | | } |
| | | } |
| | | |
| | | private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes) throws Exception |
| | | private void clearDegradedState(final EntryContainer entryContainer, final Set<String> indexes) |
| | | throws ExecutionException |
| | | { |
| | | rootContainer.getStorage().write(new WriteOperation() |
| | | try |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | rootContainer.getStorage().write(new WriteOperation() |
| | | { |
| | | visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn))); |
| | | } |
| | | }); |
| | | @Override |
| | | public void run(WriteableTransaction txn) |
| | | { |
| | | visitIndexes(entryContainer, visitOnlyIndexes(indexes, setTrust(true, txn))); |
| | | } |
| | | }); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ExecutionException(e); |
| | | } |
| | | } |
| | | |
| | | private void rebuildIndex(EntryContainer entryContainer, String tmpDirectory, Set<String> indexesToRebuild, |
| | | long totalEntries) throws Exception |
| | | long totalEntries) throws InitializationException, ConfigException, InterruptedException, ExecutionException |
| | | { |
| | | if (indexesToRebuild.isEmpty()) |
| | | { |
| | |
| | | } |
| | | |
| | | /** Source of LDAP {@link Entry}s to process. */ |
| | | private interface Source |
| | | private interface Source extends Closeable |
| | | { |
| | | /** Process {@link Entry}s extracted from a {@link Source}. */ |
| | | interface EntryProcessor |
| | |
| | | void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws Exception; |
| | | } |
| | | |
| | | void processAllEntries(EntryProcessor processor) throws Exception; |
| | | void processAllEntries(EntryProcessor processor) throws InterruptedException, ExecutionException; |
| | | |
| | | boolean isCancelled(); |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void processAllEntries(final EntryProcessor entryProcessor) throws Exception |
| | | public void close() |
| | | { |
| | | closeSilently(reader); |
| | | } |
| | | |
| | | @Override |
| | | public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException |
| | | { |
| | | final ScheduledExecutorService scheduler = |
| | | Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true)); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void processAllEntries(final EntryProcessor entryProcessor) throws Exception |
| | | public void close() { |
| | | executor.shutdown(); |
| | | } |
| | | |
| | | @Override |
| | | public void processAllEntries(final EntryProcessor entryProcessor) throws InterruptedException, ExecutionException |
| | | { |
| | | final ScheduledExecutorService scheduler = |
| | | Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true)); |
| | | scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS); |
| | | final PromiseImpl<Void, Exception> promise = PromiseImpl.create(); |
| | | final PromiseImpl<Void, ExecutionException> promise = PromiseImpl.create(); |
| | | final ID2Entry id2Entry = entryContainer.getID2Entry(); |
| | | try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(id2Entry.getName())) |
| | | { |
| | |
| | | new EntryID(key), id2Entry.entryFromDatabase(value, schema)); |
| | | nbEntriesProcessed.incrementAndGet(); |
| | | } |
| | | catch (Exception e) |
| | | catch (ExecutionException e) |
| | | { |
| | | interrupted = true; |
| | | promise.handleException(e); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | interrupted = true; |
| | | promise.handleException(new ExecutionException(e)); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | |
| | | // Forward exception if any |
| | | if (promise.isDone()) |
| | | { |
| | | promise.getOrThrow(0, TimeUnit.SECONDS); |
| | | promise.getOrThrow(); |
| | | } |
| | | } |
| | | |
| | |
| | | this.importStrategy = importStrategy; |
| | | } |
| | | |
| | | private void doImport(final Source source) throws Exception |
| | | private void doImport(final Source source) throws InterruptedException, ExecutionException |
| | | { |
| | | final long phaseOneStartTime = System.currentTimeMillis(); |
| | | final PhaseOneWriteableTransaction transaction = new PhaseOneWriteableTransaction(importStrategy); |
| | |
| | | final ConcurrentMap<EntryContainer, CountDownLatch> importedContainers = new ConcurrentHashMap<>(); |
| | | |
| | | // Start phase one |
| | | source.processAllEntries(new Source.EntryProcessor() |
| | | try |
| | | { |
| | | @Override |
| | | public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException, |
| | | InterruptedException |
| | | source.processAllEntries(new Source.EntryProcessor() |
| | | { |
| | | CountDownLatch latch = importedContainers.get(container); |
| | | if (latch == null) |
| | | @Override |
| | | public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException, |
| | | InterruptedException |
| | | { |
| | | final CountDownLatch newLatch = new CountDownLatch(1); |
| | | if (importedContainers.putIfAbsent(container, newLatch) == null) |
| | | CountDownLatch latch = importedContainers.get(container); |
| | | if (latch == null) |
| | | { |
| | | try |
| | | final CountDownLatch newLatch = new CountDownLatch(1); |
| | | if (importedContainers.putIfAbsent(container, newLatch) == null) |
| | | { |
| | | importStrategy.beforePhaseOne(container); |
| | | try |
| | | { |
| | | importStrategy.beforePhaseOne(container); |
| | | } |
| | | finally |
| | | { |
| | | newLatch.countDown(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | newLatch.countDown(); |
| | | } |
| | | latch = importedContainers.get(container); |
| | | } |
| | | latch = importedContainers.get(container); |
| | | } |
| | | latch.await(); |
| | | latch.await(); |
| | | |
| | | importStrategy.validate(container, entryID, entry); |
| | | container.importEntry(transaction, entryID, entry); |
| | | importedCount.incrementAndGet(); |
| | | } |
| | | }); |
| | | container.importEntry(transaction, entryID, entry); |
| | | importedCount.incrementAndGet(); |
| | | } |
| | | }); |
| | | } |
| | | finally |
| | | { |
| | | closeSilently(source); |
| | | } |
| | | phaseOneTimeMs = System.currentTimeMillis() - phaseOneStartTime; |
| | | |
| | | if (source.isCancelled()) |
| | |
| | | this.sorter = sorter; |
| | | } |
| | | |
| | | abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException; |
| | | |
| | | void beforePhaseOne(EntryContainer entryContainer) |
| | | { |
| | | entryContainer.delete(asWriteableTransaction(importer)); |
| | |
| | | } |
| | | |
| | | final Callable<Void> newDN2IDImporterTask(TreeName treeName, final Chunk source, |
| | | PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported) |
| | | PhaseTwoProgressReporter progressReporter) |
| | | { |
| | | final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN()); |
| | | final ID2Entry id2entry = entryContainer.getID2Entry(); |
| | | final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount(); |
| | | |
| | | return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source, |
| | | id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported); |
| | | return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, id2entry, entryContainer.getDN2ID(), |
| | | source, id2count, newPhaseTwoCollector(entryContainer, id2count.getName())); |
| | | } |
| | | |
| | | final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source, |
| | |
| | | } |
| | | |
| | | /** |
| | | * No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated |
| | | * {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into |
| | | * the database through {@link ImporterToChunkAdapter}. |
| | | * During phase one, import all {@link TreeName} (but id2entry) into a dedicated and temporary |
| | | * {@link ExternalSortChunk} which will sort the keys in the ascending order. Phase two will copy the sorted keys into |
| | | * the database using the {@link Importer}. id2entry database is imported directly into the database using |
| | | * {@link ImporterToChunkAdapter}. |
| | | */ |
| | | private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy |
| | | private static final class ExternalSortAndImportStrategy extends AbstractTwoPhaseImportStrategy |
| | | { |
| | | SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, |
| | | ExternalSortAndImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, |
| | | BufferPool bufferPool, Executor sorter) |
| | | { |
| | | super(entryContainers, importer, tempDir, bufferPool, sorter); |
| | | } |
| | | |
| | | @Override |
| | | public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) |
| | | { |
| | | // No validation performed. All entries are considered valid. |
| | | } |
| | | |
| | | @Override |
| | | public Chunk newChunk(TreeName treeName) throws Exception |
| | | { |
| | | if (isID2Entry(treeName)) |
| | |
| | | } |
| | | else if (isDN2ID(treeName)) |
| | | { |
| | | return newDN2IDImporterTask(treeName, source, progressReporter, false); |
| | | return newDN2IDImporterTask(treeName, source, progressReporter); |
| | | } |
| | | else if (isVLVIndex(entryContainer, treeName)) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that |
| | | * the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of |
| | | * id2entry. Others tree are externally sorted before being imported into the database. |
| | | */ |
| | | private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements |
| | | ReadableTransaction |
| | | { |
| | | private static final int DN_CACHE_SIZE = 16; |
| | | private final LRUPresenceCache<DN> dnCache = new LRUPresenceCache<>(DN_CACHE_SIZE); |
| | | |
| | | SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, |
| | | BufferPool bufferPool, Executor sorter) |
| | | { |
| | | super(entryContainers, importer, tempDir, bufferPool, sorter); |
| | | } |
| | | |
| | | @Override |
| | | public Chunk newChunk(TreeName treeName) throws Exception |
| | | { |
| | | if (isID2Entry(treeName)) |
| | | { |
| | | return new MostlyOrderedChunk(asChunk(treeName, importer)); |
| | | } |
| | | else if (isDN2ID(treeName)) |
| | | { |
| | | return asChunk(treeName, importer); |
| | | } |
| | | return newExternalSortChunk(treeName); |
| | | } |
| | | |
| | | @Override |
| | | public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk source, |
| | | PhaseTwoProgressReporter progressReporter) |
| | | { |
| | | final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN()); |
| | | |
| | | if (isID2Entry(treeName)) |
| | | { |
| | | return newFlushTask(source); |
| | | } |
| | | else if (isDN2ID(treeName)) |
| | | { |
| | | return newDN2IDImporterTask(treeName, source, progressReporter, true); |
| | | } |
| | | else if (isVLVIndex(entryContainer, treeName)) |
| | | { |
| | | return newVLVIndexImporterTask(getVLVIndex(entryContainer, treeName), source, progressReporter); |
| | | } |
| | | return newChunkCopierTask(treeName, source, progressReporter); |
| | | } |
| | | |
| | | @Override |
| | | public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException |
| | | { |
| | | final DN2ID dn2Id = entryContainer.getDN2ID(); |
| | | final DN entryDN = entry.getName(); |
| | | final DN parentDN = entryContainer.getParentWithinBase(entryDN); |
| | | |
| | | if (parentDN != null && !dnCache.contains(parentDN) && dn2Id.get(this, parentDN) == null) |
| | | { |
| | | throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN)); |
| | | } |
| | | |
| | | if (dn2Id.get(this, entryDN) != null) |
| | | { |
| | | throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry)); |
| | | } |
| | | dnCache.add(entryDN); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(TreeName treeName, ByteSequence key) |
| | | { |
| | | return importer.read(treeName, key); |
| | | } |
| | | |
| | | @Override |
| | | public Cursor<ByteString, ByteString> openCursor(TreeName treeName) |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | | } |
| | | |
| | | @Override |
| | | public long getRecordCount(TreeName treeName) |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | | } |
| | | } |
| | | |
| | | /** Import only a specific indexes list while ignoring everything else. */ |
| | | private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy |
| | | { |
| | |
| | | { |
| | | super(entryContainers, importer, tempDir, bufferPool, sorter); |
| | | this.indexesToRebuild = new HashSet<>(indexNames.size()); |
| | | for(String indexName : indexNames) |
| | | for (String indexName : indexNames) |
| | | { |
| | | this.indexesToRebuild.add(indexName.toLowerCase()); |
| | | } |
| | |
| | | { |
| | | if (isDN2ID(treeName)) |
| | | { |
| | | return newDN2IDImporterTask(treeName, source, progressReporter, false); |
| | | return newDN2IDImporterTask(treeName, source, progressReporter); |
| | | } |
| | | else if (isVLVIndex(entryContainer, treeName)) |
| | | { |
| | |
| | | // Do nothing (flush null chunk) |
| | | return newFlushTask(source); |
| | | } |
| | | |
| | | @Override |
| | | public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException |
| | | { |
| | | // No validation performed. All entries are considered valid. |
| | | } |
| | | } |
| | | |
| | | private static <V> List<V> invokeParallel(String threadNameTemplate, Collection<Callable<V>> tasks) |
| | |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | |
| | | return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors) |
| | | final CompositeCursor<ByteString, ByteString> cursor = new CompositeCursor<ByteString, ByteString>(name, cursors) |
| | | { |
| | | @Override |
| | | public void close() |
| | |
| | | } |
| | | closeSilently(channel); |
| | | } |
| | | }, (Collector<?, ByteString>) phaseTwoDeduplicator); |
| | | }; |
| | | return phaseTwoDeduplicator != null |
| | | ? new CollectorCursor<>(cursor, (Collector<?, ByteString>) phaseTwoDeduplicator) |
| | | : cursor; |
| | | } |
| | | |
| | | @Override |
| | |
| | | checkThreadNotInterrupted(); |
| | | final int regionSize; |
| | | try (final FileRegion region = new FileRegion(channel, startOffset, chunk.size()); |
| | | final SequentialCursor<ByteString, ByteString> source = |
| | | new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator)) |
| | | final SequentialCursor<ByteString, ByteString> source = phaseOneDeduplicator != null |
| | | ? new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator) |
| | | : chunk.flip()) |
| | | { |
| | | regionSize = region.write(source); |
| | | } |
| | |
| | | private final Importer importer; |
| | | private final File tempDir; |
| | | private final BufferPool bufferPool; |
| | | private final ID2Entry id2entry; |
| | | private final DN2ID dn2id; |
| | | private final ID2ChildrenCount id2count; |
| | | private final Collector<?, ByteString> id2countCollector; |
| | |
| | | private final Chunk dn2IdDestination; |
| | | |
| | | DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool, |
| | | DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, Collector<?, ByteString> id2countCollector, |
| | | boolean dn2idAlreadyImported) |
| | | ID2Entry id2Entry, DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, |
| | | Collector<?, ByteString> id2countCollector) |
| | | { |
| | | this.reporter = progressReporter; |
| | | this.importer = importer; |
| | | this.tempDir = tempDir; |
| | | this.bufferPool = bufferPool; |
| | | this.id2entry = id2Entry; |
| | | this.dn2id = dn2id; |
| | | this.dn2IdSourceChunk = dn2IdChunk; |
| | | this.id2count = id2count; |
| | | this.id2countCollector = id2countCollector; |
| | | this.dn2IdDestination = dn2idAlreadyImported ? nullChunk() : asChunk(dn2id.getName(), importer); |
| | | this.dn2IdDestination = asChunk(dn2id.getName(), importer); |
| | | } |
| | | |
| | | @Override |
| | |
| | | id2countCollector, sameThreadExecutor()); |
| | | long totalNumberOfEntries = 0; |
| | | |
| | | final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk)); |
| | | try (final MeteredCursor<ByteString, ByteString> chunkCursor = dn2IdSourceChunk.flip(); |
| | | final SequentialCursor<ByteString, ByteString> dn2idCursor = |
| | | dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor)) |
| | | final TreeVisitor<ChildrenCount> childrenCountVisitor = |
| | | new ID2CountTreeVisitorImporter(asImporter(id2CountChunk)); |
| | | try (final SequentialCursor<ByteString, ByteString> chunkCursor = |
| | | trackCursorProgress(reporter, dn2IdSourceChunk.flip()); |
| | | final DnValidationCursorDecorator validatorCursor = |
| | | new DnValidationCursorDecorator(chunkCursor, id2entry, asWriteableTransaction(importer)); |
| | | final SequentialCursor<ByteString, ByteString> dn2idCursor = |
| | | dn2id.openCursor(validatorCursor, childrenCountVisitor)) |
| | | { |
| | | checkThreadNotInterrupted(); |
| | | while (dn2idCursor.next()) |
| | | { |
| | | checkThreadNotInterrupted(); |
| | | dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue()); |
| | | totalNumberOfEntries++; |
| | | checkThreadNotInterrupted(); |
| | | } |
| | | } |
| | | id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries)); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Throw a {@link StorageRuntimeException} when a duplicate or orphan DNs is detected. DNs returned by the decorated |
| | | * cursor must be sorted. |
| | | */ |
| | | static final class DnValidationCursorDecorator extends |
| | | SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString> |
| | | { |
| | | private final LinkedList<ByteString> parentDns = new LinkedList<>(); |
| | | private final ID2Entry id2entry; |
| | | private final ReadableTransaction txn; |
| | | |
| | | DnValidationCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, ID2Entry id2entry, |
| | | ReadableTransaction txn) |
| | | { |
| | | super(delegate); |
| | | this.id2entry = id2entry; |
| | | this.txn = txn; |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | if (!delegate.next()) |
| | | { |
| | | return false; |
| | | } |
| | | final ByteString dn = delegate.getKey(); |
| | | try |
| | | { |
| | | throwIfDuplicate(dn); |
| | | throwIfOrphan(dn); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | parentDns.add(dn); |
| | | return true; |
| | | } |
| | | |
| | | private void throwIfDuplicate(ByteString dn) throws DirectoryException |
| | | { |
| | | if (dn.equals(parentDns.peekLast())) |
| | | { |
| | | throw new DirectoryException(ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(getDnAsString())); |
| | | } |
| | | } |
| | | |
| | | private String getDnAsString() { |
| | | try |
| | | { |
| | | return id2entry.get(txn, new EntryID(delegate.getValue())).getName().toString(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | return DnKeyFormat.keyToDNString(delegate.getKey()); |
| | | } |
| | | } |
| | | |
| | | private void throwIfOrphan(ByteString dn) throws DirectoryException |
| | | { |
| | | if (!parentExists(dn)) { |
| | | throw new DirectoryException(NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(getDnAsString())); |
| | | } |
| | | } |
| | | |
| | | private boolean parentExists(ByteString childDn) |
| | | { |
| | | final Iterator<ByteString> it = parentDns.descendingIterator(); |
| | | int i = parentDns.size(); |
| | | while (it.hasNext()) |
| | | { |
| | | if (DnKeyFormat.isChild(it.next(), childDn)) |
| | | { |
| | | if (i < parentDns.size()) |
| | | { |
| | | // Reset the last element in the stack to be the parentDn: |
| | | // (removes siblings, nephews, grand-nephews, etc. of childDn) |
| | | parentDns.subList(i, parentDns.size()).clear(); |
| | | } |
| | | return true; |
| | | } |
| | | i--; |
| | | } |
| | | // First DN must represent the base-dn which is encoded as an empty ByteString. |
| | | return parentDns.isEmpty() && childDn.isEmpty(); |
| | | } |
| | | } |
| | | |
| | | private static Importer asImporter(Chunk chunk) |
| | | { |
| | | return new ChunkToImporterAdapter(chunk); |
| | |
| | | // key conflicts == sum values |
| | | return ID2ChildrenCount.getSumLongCollectorInstance(); |
| | | } |
| | | else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName)) |
| | | else if (isDN2ID(treeName)) |
| | | { |
| | | // Detection of duplicate DN will be performed during phase 2 by the DNImporterTask |
| | | return null; |
| | | } |
| | | else if (isDN2URI(treeName) || isVLVIndex(entryContainer, treeName)) |
| | | { |
| | | // key conflicts == exception |
| | | return UniqueValueCollector.getInstance(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here |
| | | * to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least |
| | | * recently used property of the cache would not be respected. |
| | | */ |
| | | private static final class LRUPresenceCache<T> |
| | | { |
| | | private final Map<T, Object> cache; |
| | | |
| | | LRUPresenceCache(final int maxEntries) |
| | | { |
| | | // +1 because newly added entry is added before the least recently one is removed. |
| | | this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true) |
| | | { |
| | | private static final long serialVersionUID = 1L; |
| | | |
| | | @Override |
| | | protected boolean removeEldestEntry(Map.Entry<T, Object> eldest) |
| | | { |
| | | return size() >= maxEntries; |
| | | } |
| | | }); |
| | | } |
| | | |
| | | public boolean contains(T object) |
| | | { |
| | | return cache.get(object) != null; |
| | | } |
| | | |
| | | public void add(T object) |
| | | { |
| | | cache.put(object, null); |
| | | } |
| | | } |
| | | |
| | | private static WriteableTransaction asWriteableTransaction(Importer importer) |
| | | { |
| | | return new ImporterToWriteableTransactionAdapter(importer); |