| | |
| | | */ |
| | | package org.opends.server.backends.jeb; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.EnvironmentStats; |
| | | import com.sleepycat.je.StatsConfig; |
| | | import com.sleepycat.je.Transaction; |
| | | import com.sleepycat.je.*; |
| | | |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.messages.JebMessages; |
| | |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.*; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.messages.JebMessages.*; |
| | | import org.opends.server.admin.std.server.JEBackendCfg; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.config.ConfigException; |
| | | |
| | | /** |
| | | * Import from LDIF to a JE backend. |
| | |
| | | */ |
| | | private int importedCount; |
| | | |
| | | /** |
| | | * The number of entries migrated. |
| | | */ |
| | | private int migratedCount; |
| | | |
| | | /** |
| | | * The number of merge passes. |
| | | */ |
| | | int mergePassNumber = 1; |
| | | |
| | | |
| | | /** |
| | | * The number of milliseconds between job progress reports. |
| | | */ |
| | | private long progressInterval = 10000; |
| | | |
| | | /** |
| | | * The progress report timer. |
| | | */ |
| | | private Timer timer; |
| | | |
| | | private int entriesProcessed; |
| | | private int importPassSize; |
| | | |
| | | |
| | | /** |
| | | * The import worker threads. |
| | |
| | | * @throws IOException If a problem occurs while opening the LDIF file for |
| | | * reading, or while reading from the LDIF file. |
| | | * @throws JebException If an error occurs in the JE backend. |
| | | * @throws DirectoryException if a directory server related error occurs. |
| | | * @throws ConfigException if a configuration related error occurs. |
| | | */ |
| | | public LDIFImportResult importLDIF(RootContainer rootContainer) |
| | | throws DatabaseException, IOException, JebException |
| | | throws DatabaseException, IOException, JebException, DirectoryException, |
| | | ConfigException |
| | | { |
| | | |
| | | // Create an LDIF reader. Throws an exception if the file does not exist. |
| | | reader = new LDIFReader(ldifImportConfig); |
| | | this.rootContainer = rootContainer; |
| | | this.config = rootContainer.getConfiguration(); |
| | | this.mergePassNumber = 1; |
| | | this.entriesProcessed = 0; |
| | | this.importPassSize = config.getBackendImportPassSize(); |
| | | if (importPassSize <= 0) |
| | | { |
| | | importPassSize = Integer.MAX_VALUE; |
| | | } |
| | | |
| | | int msgID; |
| | | String message; |
| | |
| | | TRACER.debugInfo(message); |
| | | } |
| | | |
| | | // Create the import contexts for each base DN. |
| | | DN baseDN; |
| | | |
| | | for (EntryContainer entryContainer : rootContainer.getEntryContainers()) |
| | | { |
| | | baseDN = entryContainer.getBaseDN(); |
| | | ImportContext importContext = |
| | | getImportContext(entryContainer, bufferSize); |
| | | |
| | | // Create an import context. |
| | | ImportContext importContext = new ImportContext(); |
| | | importContext.setBufferSize(bufferSize); |
| | | importContext.setConfig(config); |
| | | importContext.setLDIFImportConfig(this.ldifImportConfig); |
| | | importContext.setLDIFReader(reader); |
| | | |
| | | importContext.setBaseDN(baseDN); |
| | | importContext.setContainerName(entryContainer.getContainerName()); |
| | | importContext.setEntryContainer(entryContainer); |
| | | importContext.setBufferSize(bufferSize); |
| | | |
| | | // Create an entry queue. |
| | | LinkedBlockingQueue<Entry> queue = |
| | | new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize()); |
| | | importContext.setQueue(queue); |
| | | |
| | | importMap.put(baseDN, importContext); |
| | | if(importContext != null) |
| | | { |
| | | importMap.put(entryContainer.getBaseDN(), importContext); |
| | | } |
| | | } |
| | | |
| | | // Make a note of the time we started. |
| | |
| | | } |
| | | } |
| | | |
| | | startWorkerThreads(); |
| | | try |
| | | { |
| | | importedCount = 0; |
| | | int passNumber = 1; |
| | | boolean moreData = true; |
| | | while (moreData) |
| | | { |
| | | moreData = processLDIF(); |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE; |
| | | message = getMessage(msgID, passNumber++); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | |
| | | |
| | | long mergeStartTime = System.currentTimeMillis(); |
| | | merge(); |
| | | long mergeEndTime = System.currentTimeMillis(); |
| | | |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | migratedCount = 0; |
| | | migrateExistingEntries(); |
| | | processLDIF(); |
| | | migrateExcludedEntries(); |
| | | } |
| | | finally |
| | | { |
| | | merge(false); |
| | | tempDir.delete(); |
| | | |
| | | for(ImportContext importContext : importMap.values()) |
| | | { |
| | | DN baseDN = importContext.getBaseDN(); |
| | | EntryContainer srcEntryContainer = |
| | | importContext.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Deleteing old entry container for base DN " + |
| | | "%s and renaming temp entry container", baseDN); |
| | | } |
| | | EntryContainer unregEC = |
| | | rootContainer.unregisterEntryContainer(baseDN); |
| | | //Make sure the unregistered EC for the base DN is the same as |
| | | //the one in the import context. |
| | | if(unregEC != srcEntryContainer) |
| | | { |
| | | if(debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Current entry container used for base DN " + |
| | | "%s is not the same as the source entry container used " + |
| | | "during the migration process.", baseDN); |
| | | } |
| | | rootContainer.registerEntryContainer(baseDN, unregEC); |
| | | continue; |
| | | } |
| | | srcEntryContainer.exclusiveLock.lock(); |
| | | srcEntryContainer.delete(); |
| | | srcEntryContainer.exclusiveLock.unlock(); |
| | | EntryContainer newEC = importContext.getEntryContainer(); |
| | | newEC.exclusiveLock.lock(); |
| | | newEC.setDatabasePrefix(baseDN.toNormalizedString()); |
| | | newEC.exclusiveLock.unlock(); |
| | | rootContainer.registerEntryContainer(baseDN, newEC); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | finally |
| | |
| | | } |
| | | |
| | | msgID = MSGID_JEB_IMPORT_FINAL_STATUS; |
| | | message = getMessage(msgID, reader.getEntriesRead(), importedCount, |
| | | message = getMessage(msgID, reader.getEntriesRead(), |
| | | importedCount - migratedCount, |
| | | reader.getEntriesIgnored(), |
| | | reader.getEntriesRejected(), importTime/1000, rate); |
| | | reader.getEntriesRejected(), |
| | | migratedCount, importTime/1000, rate); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | |
| | | |
| | | /** |
| | | * Merge the intermediate files to load the index databases. |
| | | * |
| | | * @param moreData <CODE>true</CODE> if this is a intermediate merge or |
| | | * <CODE>false</CODE> if this is a final merge. |
| | | * @throws DatabaseException If an error occurs in the JE database. |
| | | */ |
| | | public void merge() |
| | | private void merge(boolean moreData) throws DatabaseException |
| | | { |
| | | ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>(); |
| | | stopWorkerThreads(); |
| | | |
| | | // Create merge threads for each base DN. |
| | | for (ImportContext importContext : importMap.values()) |
| | | try |
| | | { |
| | | EntryContainer entryContainer = importContext.getEntryContainer(); |
| | | |
| | | // For each configured attribute index. |
| | | for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes()) |
| | | if (moreData) |
| | | { |
| | | int indexEntryLimit = config.getBackendIndexEntryLimit(); |
| | | if(attrIndex.getConfiguration().getIndexEntryLimit() != null) |
| | | int msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE; |
| | | String message = getMessage(msgID, mergePassNumber++); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | int msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE; |
| | | String message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | |
| | | |
| | | long mergeStartTime = System.currentTimeMillis(); |
| | | |
| | | ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>(); |
| | | |
| | | // Create merge threads for each base DN. |
| | | for (ImportContext importContext : importMap.values()) |
| | | { |
| | | EntryContainer entryContainer = importContext.getEntryContainer(); |
| | | |
| | | // For each configured attribute index. |
| | | for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes()) |
| | | { |
| | | indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit(); |
| | | int indexEntryLimit = config.getBackendIndexEntryLimit(); |
| | | if(attrIndex.getConfiguration().getIndexEntryLimit() != null) |
| | | { |
| | | indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit(); |
| | | } |
| | | |
| | | if (attrIndex.equalityIndex != null) |
| | | { |
| | | Index index = attrIndex.equalityIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.presenceIndex != null) |
| | | { |
| | | Index index = attrIndex.presenceIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.substringIndex != null) |
| | | { |
| | | Index index = attrIndex.substringIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.orderingIndex != null) |
| | | { |
| | | Index index = attrIndex.orderingIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.approximateIndex != null) |
| | | { |
| | | Index index = attrIndex.approximateIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | } |
| | | |
| | | if (attrIndex.equalityIndex != null) |
| | | // Id2Children index. |
| | | Index id2Children = entryContainer.getID2Children(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Children, |
| | | config.getBackendIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | |
| | | // Id2Subtree index. |
| | | Index id2Subtree = entryContainer.getID2Subtree(); |
| | | indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Subtree, |
| | | config.getBackendIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | |
| | | // Run all the merge threads. |
| | | for (IndexMergeThread imt : mergers) |
| | | { |
| | | imt.start(); |
| | | } |
| | | |
| | | // Wait for the threads to finish. |
| | | for (IndexMergeThread imt : mergers) |
| | | { |
| | | try |
| | | { |
| | | Index index = attrIndex.equalityIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | imt.join(); |
| | | } |
| | | if (attrIndex.presenceIndex != null) |
| | | catch (InterruptedException e) |
| | | { |
| | | Index index = attrIndex.presenceIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.substringIndex != null) |
| | | { |
| | | Index index = attrIndex.substringIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.orderingIndex != null) |
| | | { |
| | | Index index = attrIndex.orderingIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (attrIndex.approximateIndex != null) |
| | | { |
| | | Index index = attrIndex.approximateIndex; |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Id2Children index. |
| | | Index id2Children = entryContainer.getID2Children(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Children, |
| | | config.getBackendIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | long mergeEndTime = System.currentTimeMillis(); |
| | | |
| | | // Id2Subtree index. |
| | | Index id2Subtree = entryContainer.getID2Subtree(); |
| | | indexMergeThread = |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Subtree, |
| | | config.getBackendIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | |
| | | // Run all the merge threads. |
| | | for (IndexMergeThread imt : mergers) |
| | | { |
| | | imt.start(); |
| | | } |
| | | |
| | | // Wait for the threads to finish. |
| | | for (IndexMergeThread imt : mergers) |
| | | { |
| | | try |
| | | if (moreData) |
| | | { |
| | | imt.join(); |
| | | int msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING; |
| | | String message = |
| | | getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | catch (InterruptedException e) |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | int msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED; |
| | | String message = |
| | | getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if(moreData) |
| | | { |
| | | startWorkerThreads(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Create a set of worker threads, one set for each base DN. |
| | | * Read each entry from the LDIF and determine which |
| | | * base DN the entry belongs to. Write the dn2id database, then put the |
| | | * entry on the appropriate queue for the worker threads to consume. |
| | | * Record the entry count for each base DN when all entries have been |
| | | * processed. |
| | | * |
| | | * @return true if thre is more data to be read from the LDIF file (the import |
| | | * pass size was reached), false if the entire LDIF file has been read. |
| | | * |
| | | * @throws JebException If an error occurs in the JE backend. |
| | | * @throws DatabaseException If an error occurs in the JE database. |
| | | * @throws IOException If a problem occurs while opening the LDIF file for |
| | | * reading, or while reading from the LDIF file. |
| | | */ |
| | | private boolean processLDIF() |
| | | throws JebException, DatabaseException, IOException |
| | | private void startWorkerThreads() throws DatabaseException |
| | | { |
| | | boolean moreData = false; |
| | | |
| | | // Create one set of worker threads for each base DN. |
| | | int importThreadCount = config.getBackendImportThreadCount(); |
| | | for (ImportContext ic : importMap.values()) |
| | |
| | | } |
| | | } |
| | | |
| | | try |
| | | // Start a timer for the progress report. |
| | | timer = new Timer(); |
| | | TimerTask progressTask = new ImportJob.ProgressTask(); |
| | | timer.scheduleAtFixedRate(progressTask, progressInterval, |
| | | progressInterval); |
| | | } |
| | | |
| | | private void stopWorkerThreads() |
| | | { |
| | | if(threads.size() > 0) |
| | | { |
| | | // Create a counter to use to determine whether we've hit the import |
| | | // pass size. |
| | | int entriesProcessed = 0; |
| | | int importPassSize = config.getBackendImportPassSize(); |
| | | if (importPassSize <= 0) |
| | | // Wait for the queues to be drained. |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | importPassSize = Integer.MAX_VALUE; |
| | | } |
| | | |
| | | // Start a timer for the progress report. |
| | | Timer timer = new Timer(); |
| | | TimerTask progressTask = new ImportJob.ProgressTask(); |
| | | timer.scheduleAtFixedRate(progressTask, progressInterval, |
| | | progressInterval); |
| | | |
| | | try |
| | | { |
| | | do |
| | | while (ic.getQueue().size() > 0) |
| | | { |
| | | if(threads.size() <= 0) |
| | | { |
| | | int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS; |
| | | String msg = getMessage(msgID); |
| | | throw new JebException(msgID, msg); |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Read the next entry. |
| | | Entry entry = reader.readEntry(); |
| | | |
| | | // Check for end of file. |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | // Route it according to base DN. |
| | | ImportContext importContext = getImportConfig(entry.getDN()); |
| | | |
| | | processEntry(importContext, entry); |
| | | |
| | | entriesProcessed++; |
| | | if (entriesProcessed >= importPassSize) |
| | | { |
| | | moreData = true; |
| | | break; |
| | | } |
| | | } |
| | | catch (LDIFException e) |
| | | Thread.sleep(100); |
| | | } catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | // No action needed. |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } while (true); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if(threads.size() > 0) |
| | | // Order the threads to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | t.stopProcessing(); |
| | | } |
| | | |
| | | // Wait for each thread to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | try |
| | | { |
| | | t.join(); |
| | | importedCount += t.getImportedCount(); |
| | | } |
| | | catch (InterruptedException ie) |
| | | { |
| | | // No action needed? |
| | | } |
| | | } |
| | | |
| | | timer.cancel(); |
| | | } |
| | | |
| | | /** |
| | | * Create a set of worker threads, one set for each base DN. |
| | | * Read each entry from the LDIF and determine which |
| | | * base DN the entry belongs to. Write the dn2id database, then put the |
| | | * entry on the appropriate queue for the worker threads to consume. |
| | | * Record the entry count for each base DN when all entries have been |
| | | * processed. |
| | | * |
| | | * pass size was reached), false if the entire LDIF file has been read. |
| | | * |
| | | * @throws JebException If an error occurs in the JE backend. |
| | | * @throws DatabaseException If an error occurs in the JE database. |
| | | * @throws IOException If a problem occurs while opening the LDIF file for |
| | | * reading, or while reading from the LDIF file. |
| | | */ |
| | | private void processLDIF() |
| | | throws JebException, DatabaseException, IOException |
| | | { |
| | | int msgID = MSGID_JEB_IMPORT_LDIF_START; |
| | | String message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | do |
| | | { |
| | | if(threads.size() <= 0) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS; |
| | | message = getMessage(msgID); |
| | | throw new JebException(msgID, message); |
| | | } |
| | | try |
| | | { |
| | | // Read the next entry. |
| | | Entry entry = reader.readEntry(); |
| | | |
| | | // Check for end of file. |
| | | if (entry == null) |
| | | { |
| | | // Wait for the queues to be drained. |
| | | for (ImportContext ic : importMap.values()) |
| | | msgID = MSGID_JEB_IMPORT_LDIF_END; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | break; |
| | | } |
| | | |
| | | // Route it according to base DN. |
| | | ImportContext importContext = getImportConfig(entry.getDN()); |
| | | |
| | | processEntry(importContext, entry); |
| | | |
| | | entriesProcessed++; |
| | | if (entriesProcessed >= importPassSize) |
| | | { |
| | | merge(false); |
| | | entriesProcessed = 0; |
| | | } |
| | | } |
| | | catch (LDIFException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } while (true); |
| | | } |
| | | |
| | | private void migrateExistingEntries() |
| | | throws JebException, DatabaseException, DirectoryException |
| | | { |
| | | for(ImportContext importContext : importMap.values()) |
| | | { |
| | | EntryContainer srcEntryContainer = importContext.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null && |
| | | !importContext.getIncludeBranches().isEmpty()) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode lockMode = LockMode.DEFAULT; |
| | | OperationStatus status; |
| | | |
| | | int msgID = MSGID_JEB_IMPORT_MIGRATION_START; |
| | | String message = getMessage(msgID, "existing", |
| | | importContext.getBaseDN()); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | Cursor cursor = |
| | | srcEntryContainer.getDN2ID().openCursor(null, |
| | | CursorConfig.READ_COMMITTED); |
| | | try |
| | | { |
| | | status = cursor.getFirst(key, data, lockMode); |
| | | |
| | | while(status == OperationStatus.SUCCESS) |
| | | { |
| | | while (ic.getQueue().size() > 0) |
| | | if(threads.size() <= 0) |
| | | { |
| | | try |
| | | msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS; |
| | | message = getMessage(msgID); |
| | | throw new JebException(msgID, message); |
| | | } |
| | | |
| | | DN dn = DN.decode(new ASN1OctetString(key.getData())); |
| | | if(!importContext.getIncludeBranches().contains(dn)) |
| | | { |
| | | EntryID id = new EntryID(data); |
| | | Entry entry = srcEntryContainer.getID2Entry().get(null, id); |
| | | processEntry(importContext, entry); |
| | | |
| | | entriesProcessed++; |
| | | migratedCount++; |
| | | if (entriesProcessed >= importPassSize) |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (Exception e) |
| | | merge(true); |
| | | entriesProcessed = 0; |
| | | } |
| | | status = cursor.getNext(key, data, lockMode); |
| | | } |
| | | else |
| | | { |
| | | // This is the base entry for a branch that will be included |
| | | // in the import so we don't want to copy the branch to the new |
| | | // entry container. |
| | | |
| | | /** |
| | | * Advance the cursor to next entry at the same level in the DIT |
| | | * skipping all the entries in this branch. |
| | | * Set the next starting value to a value of equal length but |
| | | * slightly greater than the previous DN. Since keys are compared |
| | | * in reverse order we must set the first byte (the comma). |
| | | * No possibility of overflow here. |
| | | */ |
| | | byte[] begin = |
| | | StaticUtils.getBytes("," + dn.toNormalizedString()); |
| | | begin[0] = (byte) (begin[0] + 1); |
| | | key.setData(begin); |
| | | status = cursor.getSearchKeyRange(key, data, lockMode); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void migrateExcludedEntries() |
| | | throws JebException, DatabaseException |
| | | { |
| | | for(ImportContext importContext : importMap.values()) |
| | | { |
| | | EntryContainer srcEntryContainer = importContext.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null && |
| | | !importContext.getExcludeBranches().isEmpty()) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode lockMode = LockMode.DEFAULT; |
| | | OperationStatus status; |
| | | |
| | | int msgID = MSGID_JEB_IMPORT_MIGRATION_START; |
| | | String message = getMessage(msgID, "excluded", |
| | | importContext.getBaseDN()); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | Cursor cursor = |
| | | srcEntryContainer.getDN2ID().openCursor(null, |
| | | CursorConfig.READ_COMMITTED); |
| | | Comparator<byte[]> dn2idComparator = |
| | | srcEntryContainer.getDN2ID().getComparator(); |
| | | try |
| | | { |
| | | for(DN excludedDN : importContext.getExcludeBranches()) |
| | | { |
| | | byte[] suffix = |
| | | StaticUtils.getBytes(excludedDN.toNormalizedString()); |
| | | key.setData(suffix); |
| | | status = cursor.getSearchKeyRange(key, data, lockMode); |
| | | |
| | | if(status == OperationStatus.SUCCESS && |
| | | Arrays.equals(key.getData(), suffix)) |
| | | { |
| | | // This is the base entry for a branch that was excluded in the |
| | | // import so we must migrate all entries in this branch over to |
| | | // the new entry container. |
| | | |
| | | byte[] end = |
| | | StaticUtils.getBytes("," + excludedDN.toNormalizedString()); |
| | | end[0] = (byte) (end[0] + 1); |
| | | |
| | | while(status == OperationStatus.SUCCESS && |
| | | dn2idComparator.compare(key.getData(), end) < 0) |
| | | { |
| | | // No action needed. |
| | | if(threads.size() <= 0) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS; |
| | | message = getMessage(msgID); |
| | | throw new JebException(msgID, message); |
| | | } |
| | | |
| | | EntryID id = new EntryID(data); |
| | | Entry entry = srcEntryContainer.getID2Entry().get(null, id); |
| | | processEntry(importContext, entry); |
| | | |
| | | entriesProcessed++; |
| | | migratedCount++; |
| | | if (entriesProcessed >= importPassSize) |
| | | { |
| | | merge(true); |
| | | entriesProcessed = 0; |
| | | } |
| | | status = cursor.getNext(key, data, lockMode); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | finally |
| | | { |
| | | timer.cancel(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | // Order the threads to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | t.stopProcessing(); |
| | | } |
| | | |
| | | // Wait for each thread to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | try |
| | | finally |
| | | { |
| | | t.join(); |
| | | importedCount += t.getImportedCount(); |
| | | } |
| | | catch (InterruptedException ie) |
| | | { |
| | | // No action needed? |
| | | cursor.close(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | return moreData; |
| | | } |
| | | |
| | | /** |
| | |
| | | return importContext; |
| | | } |
| | | |
| | | private ImportContext getImportContext(EntryContainer entryContainer, |
| | | long bufferSize) |
| | | throws DatabaseException, JebException, ConfigException |
| | | { |
| | | DN baseDN = entryContainer.getBaseDN(); |
| | | EntryContainer srcEntryContainer = null; |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | List<DN> excludeBranches = new ArrayList<DN>(); |
| | | |
| | | if(!ldifImportConfig.appendToExistingData() && |
| | | !ldifImportConfig.clearBackend()) |
| | | { |
| | | for(DN dn : ldifImportConfig.getExcludeBranches()) |
| | | { |
| | | if(baseDN.equals(dn)) |
| | | { |
| | | // This entire base DN was explicitly excluded. Skip. |
| | | return null; |
| | | } |
| | | if(baseDN.isAncestorOf(dn)) |
| | | { |
| | | excludeBranches.add(dn); |
| | | } |
| | | } |
| | | |
| | | if(!ldifImportConfig.getIncludeBranches().isEmpty()) |
| | | { |
| | | for(DN dn : ldifImportConfig.getIncludeBranches()) |
| | | { |
| | | if(baseDN.isAncestorOf(dn)) |
| | | { |
| | | includeBranches.add(dn); |
| | | } |
| | | } |
| | | |
| | | if(includeBranches.isEmpty()) |
| | | { |
| | | // There are no branches in the explicitly defined include list under |
| | | // this base DN. Skip this base DN alltogether. |
| | | |
| | | return null; |
| | | } |
| | | |
| | | // Remove any overlapping include branches. |
| | | for(DN includeDN : includeBranches) |
| | | { |
| | | boolean keep = true; |
| | | for(DN dn : includeBranches) |
| | | { |
| | | if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) |
| | | { |
| | | keep = false; |
| | | break; |
| | | } |
| | | } |
| | | if(!keep) |
| | | { |
| | | includeBranches.remove(includeDN); |
| | | } |
| | | } |
| | | |
| | | // Remvoe any exclude branches that are not are not under a include |
| | | // branch since they will be migrated as part of the existing entries |
| | | // outside of the include branches anyways. |
| | | for(DN excludeDN : excludeBranches) |
| | | { |
| | | boolean keep = false; |
| | | for(DN includeDN : includeBranches) |
| | | { |
| | | if(includeDN.isAncestorOf(excludeDN)) |
| | | { |
| | | keep = true; |
| | | break; |
| | | } |
| | | } |
| | | if(!keep) |
| | | { |
| | | excludeBranches.remove(excludeDN); |
| | | } |
| | | } |
| | | |
| | | if(includeBranches.size() == 1 && excludeBranches.size() == 0 && |
| | | includeBranches.get(0).equals(baseDN)) |
| | | { |
| | | // This entire base DN is explicitly included in the import with |
| | | // no exclude branches that we need to migrate. Just clear the entry |
| | | // container. |
| | | entryContainer.exclusiveLock.lock(); |
| | | entryContainer.clear(); |
| | | entryContainer.exclusiveLock.unlock(); |
| | | } |
| | | else |
| | | { |
| | | // Create a temp entry container |
| | | srcEntryContainer = entryContainer; |
| | | entryContainer = |
| | | rootContainer.openEntryContainer(baseDN, |
| | | baseDN.toNormalizedString() + |
| | | "_importTmp"); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Create an import context. |
| | | ImportContext importContext = new ImportContext(); |
| | | importContext.setBufferSize(bufferSize); |
| | | importContext.setConfig(config); |
| | | importContext.setLDIFImportConfig(this.ldifImportConfig); |
| | | importContext.setLDIFReader(reader); |
| | | |
| | | importContext.setBaseDN(baseDN); |
| | | importContext.setEntryContainer(entryContainer); |
| | | importContext.setSrcEntryContainer(srcEntryContainer); |
| | | importContext.setBufferSize(bufferSize); |
| | | |
| | | // Create an entry queue. |
| | | LinkedBlockingQueue<Entry> queue = |
| | | new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize()); |
| | | importContext.setQueue(queue); |
| | | |
| | | // Set the include and exclude branches |
| | | importContext.setIncludeBranches(includeBranches); |
| | | importContext.setExcludeBranches(excludeBranches); |
| | | |
| | | return importContext; |
| | | } |
| | | |
| | | /** |
| | | * This class reports progress of the import job at fixed intervals. |
| | | */ |
| | |
| | | */ |
| | | public void run() |
| | | { |
| | | long latestCount = reader.getEntriesRead(); |
| | | long latestCount = reader.getEntriesRead() + migratedCount; |
| | | long deltaCount = (latestCount - previousCount); |
| | | long latestTime = System.currentTimeMillis(); |
| | | long deltaTime = latestTime - previousTime; |
| | |
| | | |
| | | int msgID = MSGID_JEB_IMPORT_PROGRESS_REPORT; |
| | | String message = getMessage(msgID, numRead, numIgnored, numRejected, |
| | | rate); |
| | | migratedCount, rate); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |