| | |
| | | private final TmpEnv tmpEnv; |
| | | |
| | | /** Root container. */ |
| | | private RootContainer rootContainer; |
| | | private final RootContainer rootContainer; |
| | | |
| | | /** Import configuration. */ |
| | | private final LDIFImportConfig importConfiguration; |
| | |
| | | * @throws ConfigException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | Importer(RebuildConfig rebuildConfig, PluggableBackendCfg cfg, ServerContext serverContext) |
| | | throws InitializationException, StorageRuntimeException, ConfigException |
| | | Importer(RootContainer rootContainer, RebuildConfig rebuildConfig, PluggableBackendCfg cfg, |
| | | ServerContext serverContext) throws InitializationException, StorageRuntimeException, ConfigException |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | this.importConfiguration = null; |
| | | this.serverContext = serverContext; |
| | | this.tmpEnv = null; |
| | |
| | | * @throws StorageRuntimeException |
| | | * If an error occurred when opening the DB. |
| | | */ |
| | | Importer(LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg, ServerContext serverContext) |
| | | throws InitializationException, ConfigException, StorageRuntimeException |
| | | Importer(RootContainer rootContainer, LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg, |
| | | ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | this.rebuildManager = null; |
| | | this.importConfiguration = importConfiguration; |
| | | this.serverContext = serverContext; |
| | |
| | | * @throws ExecutionException |
| | | * If an execution error occurred. |
| | | */ |
| | | public void rebuildIndexes(RootContainer rootContainer) |
| | | throws ConfigException, InitializationException, StorageRuntimeException, |
| | | public void rebuildIndexes() throws ConfigException, InitializationException, StorageRuntimeException, |
| | | InterruptedException, ExecutionException |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | |
| | | try |
| | | { |
| | | if (rebuildManager.rebuildConfig.isClearDegradedState()) |
| | |
| | | } |
| | | else |
| | | { |
| | | rebuildIndexes(); |
| | | doRebuildIndexes(); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | }); |
| | | } |
| | | |
| | | private void rebuildIndexes() throws Exception |
| | | private void doRebuildIndexes() throws Exception |
| | | { |
| | | final long startTime = System.currentTimeMillis(); |
| | | final Storage storage = rootContainer.getStorage(); |
| | |
| | | * @throws Exception |
| | | * If the import failed |
| | | */ |
| | | public LDIFImportResult processImport(RootContainer rootContainer) throws Exception |
| | | public LDIFImportResult processImport() throws Exception |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | try { |
| | | try |
| | | { |
| | |
| | | final ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | final Storage storage = rootContainer.getStorage(); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | execService.submit(new MigrateExistingTask(txn)).get(); |
| | | } |
| | | }); |
| | | execService.submit(new MigrateExistingTask(storage)).get(); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | if (importConfiguration.appendToExistingData() |
| | |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new AppendReplaceTask(storage.getWriteableTransaction())); |
| | | tasks.add(new AppendReplaceTask(storage)); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new ImportTask(storage.getWriteableTransaction())); |
| | | tasks.add(new ImportTask(storage)); |
| | | } |
| | | } |
| | | execService.invokeAll(tasks); |
| | | tasks.clear(); |
| | | |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | execService.submit(new MigrateExcludedTask(txn)).get(); |
| | | } |
| | | }); |
| | | execService.submit(new MigrateExcludedTask(storage)).get(); |
| | | |
| | | stopScratchFileWriters(); |
| | | getAll(scratchFileWriterFutures); |
| | |
| | | /** Task used to migrate excluded branch. */ |
| | | private final class MigrateExcludedTask extends ImportTask |
| | | { |
| | | private MigrateExcludedTask(final WriteableTransaction txn) |
| | | private MigrateExcludedTask(final Storage storage) |
| | | { |
| | | super(txn); |
| | | super(storage); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | Void call0(WriteableTransaction txn) throws Exception |
| | | { |
| | | for (Suffix suffix : dnSuffixMap.values()) |
| | | { |
| | |
| | | { |
| | | EntryID id = new EntryID(cursor.getValue()); |
| | | Entry entry = entryContainer.getID2Entry().get(txn, id); |
| | | processEntry(entry, rootContainer.getNextEntryID(), suffix); |
| | | processEntry(txn, entry, rootContainer.getNextEntryID(), suffix); |
| | | migratedCount++; |
| | | success = cursor.next(); |
| | | } |
| | |
| | | /** Task to migrate existing entries. */ |
| | | private final class MigrateExistingTask extends ImportTask |
| | | { |
| | | private MigrateExistingTask(final WriteableTransaction txn) |
| | | private MigrateExistingTask(final Storage storage) |
| | | { |
| | | super(txn); |
| | | super(storage); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | Void call0(WriteableTransaction txn) throws Exception |
| | | { |
| | | for (Suffix suffix : dnSuffixMap.values()) |
| | | { |
| | |
| | | { |
| | | EntryID id = new EntryID(key); |
| | | Entry entry = entryContainer.getID2Entry().get(txn, id); |
| | | processEntry(entry, rootContainer.getNextEntryID(), suffix); |
| | | processEntry(txn, entry, rootContainer.getNextEntryID(), suffix); |
| | | migratedCount++; |
| | | success = cursor.next(); |
| | | } |
| | |
| | | */ |
| | | private class AppendReplaceTask extends ImportTask |
| | | { |
| | | public AppendReplaceTask(final WriteableTransaction txn) |
| | | public AppendReplaceTask(final Storage storage) |
| | | { |
| | | super(txn); |
| | | super(storage); |
| | | } |
| | | |
| | | private final Set<ByteString> insertKeySet = new HashSet<ByteString>(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | Void call0(WriteableTransaction txn) throws Exception |
| | | { |
| | | try |
| | | { |
| | |
| | | } |
| | | entryID = entryInfo.getEntryID(); |
| | | Suffix suffix = entryInfo.getSuffix(); |
| | | processEntry(entry, suffix); |
| | | processEntry(txn, entry, suffix); |
| | | } |
| | | flushIndexBuffers(); |
| | | return null; |
| | |
| | | } |
| | | } |
| | | |
| | | void processEntry(Entry entry, Suffix suffix) |
| | | void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix) |
| | | throws DirectoryException, StorageRuntimeException, InterruptedException |
| | | { |
| | | DN entryDN = entry.getName(); |
| | |
| | | } |
| | | if (oldEntry == null) |
| | | { |
| | | if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix)) |
| | | if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix)) |
| | | { |
| | | suffix.removePending(entryDN); |
| | | return; |
| | |
| | | suffix.removePending(entryDN); |
| | | entryID = oldID; |
| | | } |
| | | processDN2URI(suffix, oldEntry, entry); |
| | | processDN2URI(txn, suffix, oldEntry, entry); |
| | | suffix.getID2Entry().put(txn, entryID, entry); |
| | | if (oldEntry != null) |
| | | { |
| | |
| | | { |
| | | processIndexes(suffix, entry, entryID); |
| | | } |
| | | processVLVIndexes(suffix, entry, entryID); |
| | | processVLVIndexes(txn, suffix, entry, entryID); |
| | | importCount.getAndIncrement(); |
| | | } |
| | | |
| | |
| | | */ |
| | | private class ImportTask implements Callable<Void> |
| | | { |
| | | WriteableTransaction txn; |
| | | private final Storage storage; |
| | | private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>(); |
| | | private final Set<ByteString> insertKeySet = new HashSet<ByteString>(); |
| | | private final EntryInformation entryInfo = new EntryInformation(); |
| | | private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID, 1); |
| | | |
| | | public ImportTask(final WriteableTransaction txn) |
| | | public ImportTask(final Storage storage) |
| | | { |
| | | this.txn = txn; |
| | | this.storage = storage; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | public final Void call() throws Exception |
| | | { |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | call0(txn); |
| | | } |
| | | }); |
| | | return null; |
| | | } |
| | | |
| | | Void call0(WriteableTransaction txn) throws Exception { |
| | | try |
| | | { |
| | | while (true) |
| | |
| | | } |
| | | EntryID entryID = entryInfo.getEntryID(); |
| | | Suffix suffix = entryInfo.getSuffix(); |
| | | processEntry(entry, entryID, suffix); |
| | | processEntry(txn, entry, entryID, suffix); |
| | | } |
| | | flushIndexBuffers(); |
| | | return null; |
| | |
| | | } |
| | | } |
| | | |
| | | void processEntry(Entry entry, EntryID entryID, Suffix suffix) |
| | | void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix) |
| | | throws DirectoryException, StorageRuntimeException, InterruptedException |
| | | { |
| | | DN entryDN = entry.getName(); |
| | | if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix)) |
| | | if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix)) |
| | | { |
| | | suffix.removePending(entryDN); |
| | | return; |
| | | } |
| | | suffix.removePending(entryDN); |
| | | processDN2ID(suffix, entryDN, entryID); |
| | | processDN2URI(suffix, null, entry); |
| | | processDN2URI(txn, suffix, null, entry); |
| | | processIndexes(suffix, entry, entryID); |
| | | processVLVIndexes(suffix, entry, entryID); |
| | | processVLVIndexes(txn, suffix, entry, entryID); |
| | | suffix.getID2Entry().put(txn, entryID, entry); |
| | | importCount.getAndIncrement(); |
| | | } |
| | | |
| | | /** Examine the DN for duplicates and missing parents. */ |
| | | boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix) |
| | | boolean dnSanityCheck(WriteableTransaction txn, DN entryDN, Entry entry, Suffix suffix) |
| | | throws StorageRuntimeException, InterruptedException |
| | | { |
| | | //Perform parent checking. |
| | |
| | | } |
| | | } |
| | | |
| | | void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException |
| | | void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID) |
| | | throws DirectoryException |
| | | { |
| | | final EntryContainer entryContainer = suffix.getEntryContainer(); |
| | | final IndexBuffer buffer = new IndexBuffer(entryContainer); |
| | |
| | | indexIDToECMap.putIfAbsent(indexID, suffix.getEntryContainer()); |
| | | } |
| | | |
| | | void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry) throws StorageRuntimeException |
| | | void processDN2URI(WriteableTransaction txn, Suffix suffix, Entry oldEntry, Entry newEntry) |
| | | throws StorageRuntimeException |
| | | { |
| | | DN2URI dn2uri = suffix.getDN2URI(); |
| | | if (oldEntry != null) |
| | |
| | | */ |
| | | void printStartMessage(WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | this.txn = txn; |
| | | totalEntries = suffix.getID2Entry().getRecordCount(txn); |
| | | |
| | | switch (rebuildConfig.getRebuildMode()) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | Void call0(WriteableTransaction txn) throws Exception |
| | | { |
| | | ID2Entry id2entry = entryContainer.getID2Entry(); |
| | | Cursor<ByteString, ByteString> cursor = txn.openCursor(id2entry.getName()); |
| | |
| | | Entry entry = |
| | | ID2Entry.entryFromDatabase(cursor.getValue(), |
| | | entryContainer.getRootContainer().getCompressedSchema()); |
| | | processEntry(entry, entryID); |
| | | processEntry(txn, entry, entryID); |
| | | entriesProcessed.getAndIncrement(); |
| | | } |
| | | flushIndexBuffers(); |
| | |
| | | return result; |
| | | } |
| | | |
| | | private void processEntry(Entry entry, EntryID entryID) |
| | | private void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID) |
| | | throws DirectoryException, StorageRuntimeException, InterruptedException |
| | | { |
| | | if (dn2id != null) |
| | |
| | | } |
| | | if (dn2uri != null) |
| | | { |
| | | processDN2URI(suffix, null, entry); |
| | | processDN2URI(txn, suffix, null, entry); |
| | | } |
| | | processIndexes(entry, entryID); |
| | | processVLVIndexes(entry, entryID); |
| | | processVLVIndexes(txn, entry, entryID); |
| | | } |
| | | |
| | | private void processVLVIndexes(Entry entry, EntryID entryID) |
| | | private void processVLVIndexes(WriteableTransaction txn, Entry entry, EntryID entryID) |
| | | throws StorageRuntimeException, DirectoryException |
| | | { |
| | | final IndexBuffer buffer = new IndexBuffer(entryContainer); |