From 45e94f0f208c47806df0e8ae073c91f797f812a4 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Thu, 26 Jul 2007 16:31:34 +0000
Subject: [PATCH] This fixes issue 1971 and allows partial non-append imports of a backend:
---
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java | 896 +++++++++++++++++++++++++++++++++++++++++------------------
1 files changed, 620 insertions(+), 276 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
index 8e077a1..b06c2c2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -26,10 +26,7 @@
*/
package org.opends.server.backends.jeb;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.EnvironmentStats;
-import com.sleepycat.je.StatsConfig;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.messages.JebMessages;
@@ -48,10 +45,7 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@@ -66,6 +60,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.messages.JebMessages.*;
import org.opends.server.admin.std.server.JEBackendCfg;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.config.ConfigException;
/**
* Import from LDIF to a JE backend.
@@ -108,12 +104,30 @@
*/
private int importedCount;
+ /**
+ * The number of entries migrated.
+ */
+ private int migratedCount;
+
+ /**
+ * The number of merge passes.
+ */
+ int mergePassNumber = 1;
+
/**
* The number of milliseconds between job progress reports.
*/
private long progressInterval = 10000;
+ /**
+ * The progress report timer.
+ */
+ private Timer timer;
+
+ private int entriesProcessed;
+ private int importPassSize;
+
/**
* The import worker threads.
@@ -146,15 +160,25 @@
* @throws IOException If a problem occurs while opening the LDIF file for
* reading, or while reading from the LDIF file.
* @throws JebException If an error occurs in the JE backend.
+ * @throws DirectoryException if a directory server related error occurs.
+ * @throws ConfigException if a configuration related error occurs.
*/
public LDIFImportResult importLDIF(RootContainer rootContainer)
- throws DatabaseException, IOException, JebException
+ throws DatabaseException, IOException, JebException, DirectoryException,
+ ConfigException
{
// Create an LDIF reader. Throws an exception if the file does not exist.
reader = new LDIFReader(ldifImportConfig);
this.rootContainer = rootContainer;
this.config = rootContainer.getConfiguration();
+ this.mergePassNumber = 1;
+ this.entriesProcessed = 0;
+ this.importPassSize = config.getBackendImportPassSize();
+ if (importPassSize <= 0)
+ {
+ importPassSize = Integer.MAX_VALUE;
+ }
int msgID;
String message;
@@ -185,31 +209,15 @@
TRACER.debugInfo(message);
}
- // Create the import contexts for each base DN.
- DN baseDN;
-
for (EntryContainer entryContainer : rootContainer.getEntryContainers())
{
- baseDN = entryContainer.getBaseDN();
+ ImportContext importContext =
+ getImportContext(entryContainer, bufferSize);
- // Create an import context.
- ImportContext importContext = new ImportContext();
- importContext.setBufferSize(bufferSize);
- importContext.setConfig(config);
- importContext.setLDIFImportConfig(this.ldifImportConfig);
- importContext.setLDIFReader(reader);
-
- importContext.setBaseDN(baseDN);
- importContext.setContainerName(entryContainer.getContainerName());
- importContext.setEntryContainer(entryContainer);
- importContext.setBufferSize(bufferSize);
-
- // Create an entry queue.
- LinkedBlockingQueue<Entry> queue =
- new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
- importContext.setQueue(queue);
-
- importMap.put(baseDN, importContext);
+ if(importContext != null)
+ {
+ importMap.put(entryContainer.getBaseDN(), importContext);
+ }
}
// Make a note of the time we started.
@@ -232,53 +240,57 @@
}
}
+ startWorkerThreads();
try
{
importedCount = 0;
- int passNumber = 1;
- boolean moreData = true;
- while (moreData)
- {
- moreData = processLDIF();
- if (moreData)
- {
- msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
- message = getMessage(msgID, passNumber++);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
- }
- else
- {
- msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
- message = getMessage(msgID);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
- }
-
-
- long mergeStartTime = System.currentTimeMillis();
- merge();
- long mergeEndTime = System.currentTimeMillis();
-
- if (moreData)
- {
- msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
- message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
- }
- else
- {
- msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
- message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
- }
- }
+ migratedCount = 0;
+ migrateExistingEntries();
+ processLDIF();
+ migrateExcludedEntries();
}
finally
{
+ merge(false);
tempDir.delete();
+
+ for(ImportContext importContext : importMap.values())
+ {
+ DN baseDN = importContext.getBaseDN();
+ EntryContainer srcEntryContainer =
+ importContext.getSrcEntryContainer();
+ if(srcEntryContainer != null)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Deleteing old entry container for base DN " +
+ "%s and renaming temp entry container", baseDN);
+ }
+ EntryContainer unregEC =
+ rootContainer.unregisterEntryContainer(baseDN);
+ //Make sure the unregistered EC for the base DN is the same as
+ //the one in the import context.
+ if(unregEC != srcEntryContainer)
+ {
+ if(debugEnabled())
+ {
+ TRACER.debugInfo("Current entry container used for base DN " +
+ "%s is not the same as the source entry container used " +
+ "during the migration process.", baseDN);
+ }
+ rootContainer.registerEntryContainer(baseDN, unregEC);
+ continue;
+ }
+ srcEntryContainer.exclusiveLock.lock();
+ srcEntryContainer.delete();
+ srcEntryContainer.exclusiveLock.unlock();
+ EntryContainer newEC = importContext.getEntryContainer();
+ newEC.exclusiveLock.lock();
+ newEC.setDatabasePrefix(baseDN.toNormalizedString());
+ newEC.exclusiveLock.unlock();
+ rootContainer.registerEntryContainer(baseDN, newEC);
+ }
+ }
}
}
finally
@@ -296,9 +308,11 @@
}
msgID = MSGID_JEB_IMPORT_FINAL_STATUS;
- message = getMessage(msgID, reader.getEntriesRead(), importedCount,
+ message = getMessage(msgID, reader.getEntriesRead(),
+ importedCount - migratedCount,
reader.getEntriesIgnored(),
- reader.getEntriesRejected(), importTime/1000, rate);
+ reader.getEntriesRejected(),
+ migratedCount, importTime/1000, rate);
logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
message, msgID);
@@ -314,135 +328,169 @@
/**
* Merge the intermediate files to load the index databases.
+ *
+ * @param moreData <CODE>true</CODE> if this is a intermediate merge or
+ * <CODE>false</CODE> if this is a final merge.
+ * @throws DatabaseException If an error occurs in the JE database.
*/
- public void merge()
+ private void merge(boolean moreData) throws DatabaseException
{
- ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
+ stopWorkerThreads();
- // Create merge threads for each base DN.
- for (ImportContext importContext : importMap.values())
+ try
{
- EntryContainer entryContainer = importContext.getEntryContainer();
-
- // For each configured attribute index.
- for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
+ if (moreData)
{
- int indexEntryLimit = config.getBackendIndexEntryLimit();
- if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
+ int msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
+ String message = getMessage(msgID, mergePassNumber++);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+ else
+ {
+ int msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+
+
+ long mergeStartTime = System.currentTimeMillis();
+
+ ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
+
+ // Create merge threads for each base DN.
+ for (ImportContext importContext : importMap.values())
+ {
+ EntryContainer entryContainer = importContext.getEntryContainer();
+
+ // For each configured attribute index.
+ for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
{
- indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
+ int indexEntryLimit = config.getBackendIndexEntryLimit();
+ if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
+ {
+ indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
+ }
+
+ if (attrIndex.equalityIndex != null)
+ {
+ Index index = attrIndex.equalityIndex;
+ IndexMergeThread indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig, index,
+ indexEntryLimit);
+ mergers.add(indexMergeThread);
+ }
+ if (attrIndex.presenceIndex != null)
+ {
+ Index index = attrIndex.presenceIndex;
+ IndexMergeThread indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig, index,
+ indexEntryLimit);
+ mergers.add(indexMergeThread);
+ }
+ if (attrIndex.substringIndex != null)
+ {
+ Index index = attrIndex.substringIndex;
+ IndexMergeThread indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig, index,
+ indexEntryLimit);
+ mergers.add(indexMergeThread);
+ }
+ if (attrIndex.orderingIndex != null)
+ {
+ Index index = attrIndex.orderingIndex;
+ IndexMergeThread indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig, index,
+ indexEntryLimit);
+ mergers.add(indexMergeThread);
+ }
+ if (attrIndex.approximateIndex != null)
+ {
+ Index index = attrIndex.approximateIndex;
+ IndexMergeThread indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig, index,
+ indexEntryLimit);
+ mergers.add(indexMergeThread);
+ }
}
- if (attrIndex.equalityIndex != null)
+ // Id2Children index.
+ Index id2Children = entryContainer.getID2Children();
+ IndexMergeThread indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig,
+ id2Children,
+ config.getBackendIndexEntryLimit());
+ mergers.add(indexMergeThread);
+
+ // Id2Subtree index.
+ Index id2Subtree = entryContainer.getID2Subtree();
+ indexMergeThread =
+ new IndexMergeThread(config,
+ ldifImportConfig,
+ id2Subtree,
+ config.getBackendIndexEntryLimit());
+ mergers.add(indexMergeThread);
+ }
+
+ // Run all the merge threads.
+ for (IndexMergeThread imt : mergers)
+ {
+ imt.start();
+ }
+
+ // Wait for the threads to finish.
+ for (IndexMergeThread imt : mergers)
+ {
+ try
{
- Index index = attrIndex.equalityIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
+ imt.join();
}
- if (attrIndex.presenceIndex != null)
+ catch (InterruptedException e)
{
- Index index = attrIndex.presenceIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.substringIndex != null)
- {
- Index index = attrIndex.substringIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.orderingIndex != null)
- {
- Index index = attrIndex.orderingIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.approximateIndex != null)
- {
- Index index = attrIndex.approximateIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
}
- // Id2Children index.
- Index id2Children = entryContainer.getID2Children();
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig,
- id2Children,
- config.getBackendIndexEntryLimit());
- mergers.add(indexMergeThread);
+ long mergeEndTime = System.currentTimeMillis();
- // Id2Subtree index.
- Index id2Subtree = entryContainer.getID2Subtree();
- indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig,
- id2Subtree,
- config.getBackendIndexEntryLimit());
- mergers.add(indexMergeThread);
- }
-
- // Run all the merge threads.
- for (IndexMergeThread imt : mergers)
- {
- imt.start();
- }
-
- // Wait for the threads to finish.
- for (IndexMergeThread imt : mergers)
- {
- try
+ if (moreData)
{
- imt.join();
+ int msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
+ String message =
+ getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
}
- catch (InterruptedException e)
+ else
{
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
+ int msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
+ String message =
+ getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+ }
+ finally
+ {
+ if(moreData)
+ {
+ startWorkerThreads();
}
}
}
- /**
- * Create a set of worker threads, one set for each base DN.
- * Read each entry from the LDIF and determine which
- * base DN the entry belongs to. Write the dn2id database, then put the
- * entry on the appropriate queue for the worker threads to consume.
- * Record the entry count for each base DN when all entries have been
- * processed.
- *
- * @return true if thre is more data to be read from the LDIF file (the import
- * pass size was reached), false if the entire LDIF file has been read.
- *
- * @throws JebException If an error occurs in the JE backend.
- * @throws DatabaseException If an error occurs in the JE database.
- * @throws IOException If a problem occurs while opening the LDIF file for
- * reading, or while reading from the LDIF file.
- */
- private boolean processLDIF()
- throws JebException, DatabaseException, IOException
+ private void startWorkerThreads() throws DatabaseException
{
- boolean moreData = false;
-
// Create one set of worker threads for each base DN.
int importThreadCount = config.getBackendImportThreadCount();
for (ImportContext ic : importMap.values())
@@ -457,122 +505,291 @@
}
}
- try
+ // Start a timer for the progress report.
+ timer = new Timer();
+ TimerTask progressTask = new ImportJob.ProgressTask();
+ timer.scheduleAtFixedRate(progressTask, progressInterval,
+ progressInterval);
+ }
+
+ private void stopWorkerThreads()
+ {
+ if(threads.size() > 0)
{
- // Create a counter to use to determine whether we've hit the import
- // pass size.
- int entriesProcessed = 0;
- int importPassSize = config.getBackendImportPassSize();
- if (importPassSize <= 0)
+ // Wait for the queues to be drained.
+ for (ImportContext ic : importMap.values())
{
- importPassSize = Integer.MAX_VALUE;
- }
-
- // Start a timer for the progress report.
- Timer timer = new Timer();
- TimerTask progressTask = new ImportJob.ProgressTask();
- timer.scheduleAtFixedRate(progressTask, progressInterval,
- progressInterval);
-
- try
- {
- do
+ while (ic.getQueue().size() > 0)
{
- if(threads.size() <= 0)
- {
- int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
- String msg = getMessage(msgID);
- throw new JebException(msgID, msg);
- }
-
try
{
- // Read the next entry.
- Entry entry = reader.readEntry();
-
- // Check for end of file.
- if (entry == null)
- {
- break;
- }
-
- // Route it according to base DN.
- ImportContext importContext = getImportConfig(entry.getDN());
-
- processEntry(importContext, entry);
-
- entriesProcessed++;
- if (entriesProcessed >= importPassSize)
- {
- moreData = true;
- break;
- }
- }
- catch (LDIFException e)
+ Thread.sleep(100);
+ } catch (Exception e)
{
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
+ // No action needed.
}
- catch (DirectoryException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- } while (true);
+ }
+ }
+ }
- if(threads.size() > 0)
+ // Order the threads to stop.
+ for (ImportThread t : threads)
+ {
+ t.stopProcessing();
+ }
+
+ // Wait for each thread to stop.
+ for (ImportThread t : threads)
+ {
+ try
+ {
+ t.join();
+ importedCount += t.getImportedCount();
+ }
+ catch (InterruptedException ie)
+ {
+ // No action needed?
+ }
+ }
+
+ timer.cancel();
+ }
+
+ /**
+ * Create a set of worker threads, one set for each base DN.
+ * Read each entry from the LDIF and determine which
+ * base DN the entry belongs to. Write the dn2id database, then put the
+ * entry on the appropriate queue for the worker threads to consume.
+ * Record the entry count for each base DN when all entries have been
+ * processed.
+ *
+ * pass size was reached), false if the entire LDIF file has been read.
+ *
+ * @throws JebException If an error occurs in the JE backend.
+ * @throws DatabaseException If an error occurs in the JE database.
+ * @throws IOException If a problem occurs while opening the LDIF file for
+ * reading, or while reading from the LDIF file.
+ */
+ private void processLDIF()
+ throws JebException, DatabaseException, IOException
+ {
+ int msgID = MSGID_JEB_IMPORT_LDIF_START;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+
+ do
+ {
+ if(threads.size() <= 0)
+ {
+ msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+ message = getMessage(msgID);
+ throw new JebException(msgID, message);
+ }
+ try
+ {
+ // Read the next entry.
+ Entry entry = reader.readEntry();
+
+ // Check for end of file.
+ if (entry == null)
{
- // Wait for the queues to be drained.
- for (ImportContext ic : importMap.values())
+ msgID = MSGID_JEB_IMPORT_LDIF_END;
+ message = getMessage(msgID);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+
+ break;
+ }
+
+ // Route it according to base DN.
+ ImportContext importContext = getImportConfig(entry.getDN());
+
+ processEntry(importContext, entry);
+
+ entriesProcessed++;
+ if (entriesProcessed >= importPassSize)
+ {
+ merge(false);
+ entriesProcessed = 0;
+ }
+ }
+ catch (LDIFException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ catch (DirectoryException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ } while (true);
+ }
+
+ private void migrateExistingEntries()
+ throws JebException, DatabaseException, DirectoryException
+ {
+ for(ImportContext importContext : importMap.values())
+ {
+ EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
+ if(srcEntryContainer != null &&
+ !importContext.getIncludeBranches().isEmpty())
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ OperationStatus status;
+
+ int msgID = MSGID_JEB_IMPORT_MIGRATION_START;
+ String message = getMessage(msgID, "existing",
+ importContext.getBaseDN());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+
+ Cursor cursor =
+ srcEntryContainer.getDN2ID().openCursor(null,
+ CursorConfig.READ_COMMITTED);
+ try
+ {
+ status = cursor.getFirst(key, data, lockMode);
+
+ while(status == OperationStatus.SUCCESS)
{
- while (ic.getQueue().size() > 0)
+ if(threads.size() <= 0)
{
- try
+ msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+ message = getMessage(msgID);
+ throw new JebException(msgID, message);
+ }
+
+ DN dn = DN.decode(new ASN1OctetString(key.getData()));
+ if(!importContext.getIncludeBranches().contains(dn))
+ {
+ EntryID id = new EntryID(data);
+ Entry entry = srcEntryContainer.getID2Entry().get(null, id);
+ processEntry(importContext, entry);
+
+ entriesProcessed++;
+ migratedCount++;
+ if (entriesProcessed >= importPassSize)
{
- Thread.sleep(100);
- } catch (Exception e)
+ merge(true);
+ entriesProcessed = 0;
+ }
+ status = cursor.getNext(key, data, lockMode);
+ }
+ else
+ {
+ // This is the base entry for a branch that will be included
+ // in the import so we don't want to copy the branch to the new
+ // entry container.
+
+ /**
+ * Advance the cursor to next entry at the same level in the DIT
+ * skipping all the entries in this branch.
+ * Set the next starting value to a value of equal length but
+ * slightly greater than the previous DN. Since keys are compared
+ * in reverse order we must set the first byte (the comma).
+ * No possibility of overflow here.
+ */
+ byte[] begin =
+ StaticUtils.getBytes("," + dn.toNormalizedString());
+ begin[0] = (byte) (begin[0] + 1);
+ key.setData(begin);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+ }
+ }
+ }
+ finally
+ {
+ cursor.close();
+ }
+ }
+ }
+ }
+
+ private void migrateExcludedEntries()
+ throws JebException, DatabaseException
+ {
+ for(ImportContext importContext : importMap.values())
+ {
+ EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
+ if(srcEntryContainer != null &&
+ !importContext.getExcludeBranches().isEmpty())
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ OperationStatus status;
+
+ int msgID = MSGID_JEB_IMPORT_MIGRATION_START;
+ String message = getMessage(msgID, "excluded",
+ importContext.getBaseDN());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+
+ Cursor cursor =
+ srcEntryContainer.getDN2ID().openCursor(null,
+ CursorConfig.READ_COMMITTED);
+ Comparator<byte[]> dn2idComparator =
+ srcEntryContainer.getDN2ID().getComparator();
+ try
+ {
+ for(DN excludedDN : importContext.getExcludeBranches())
+ {
+ byte[] suffix =
+ StaticUtils.getBytes(excludedDN.toNormalizedString());
+ key.setData(suffix);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+
+ if(status == OperationStatus.SUCCESS &&
+ Arrays.equals(key.getData(), suffix))
+ {
+ // This is the base entry for a branch that was excluded in the
+ // import so we must migrate all entries in this branch over to
+ // the new entry container.
+
+ byte[] end =
+ StaticUtils.getBytes("," + excludedDN.toNormalizedString());
+ end[0] = (byte) (end[0] + 1);
+
+ while(status == OperationStatus.SUCCESS &&
+ dn2idComparator.compare(key.getData(), end) < 0)
{
- // No action needed.
+ if(threads.size() <= 0)
+ {
+ msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
+ message = getMessage(msgID);
+ throw new JebException(msgID, message);
+ }
+
+ EntryID id = new EntryID(data);
+ Entry entry = srcEntryContainer.getID2Entry().get(null, id);
+ processEntry(importContext, entry);
+
+ entriesProcessed++;
+ migratedCount++;
+ if (entriesProcessed >= importPassSize)
+ {
+ merge(true);
+ entriesProcessed = 0;
+ }
+ status = cursor.getNext(key, data, lockMode);
}
}
}
}
-
- }
- finally
- {
- timer.cancel();
- }
- }
- finally
- {
- // Order the threads to stop.
- for (ImportThread t : threads)
- {
- t.stopProcessing();
- }
-
- // Wait for each thread to stop.
- for (ImportThread t : threads)
- {
- try
+ finally
{
- t.join();
- importedCount += t.getImportedCount();
- }
- catch (InterruptedException ie)
- {
- // No action needed?
+ cursor.close();
}
}
}
-
-
- return moreData;
}
/**
@@ -799,6 +1016,133 @@
return importContext;
}
+ private ImportContext getImportContext(EntryContainer entryContainer,
+ long bufferSize)
+ throws DatabaseException, JebException, ConfigException
+ {
+ DN baseDN = entryContainer.getBaseDN();
+ EntryContainer srcEntryContainer = null;
+ List<DN> includeBranches = new ArrayList<DN>();
+ List<DN> excludeBranches = new ArrayList<DN>();
+
+ if(!ldifImportConfig.appendToExistingData() &&
+ !ldifImportConfig.clearBackend())
+ {
+ for(DN dn : ldifImportConfig.getExcludeBranches())
+ {
+ if(baseDN.equals(dn))
+ {
+ // This entire base DN was explicitly excluded. Skip.
+ return null;
+ }
+ if(baseDN.isAncestorOf(dn))
+ {
+ excludeBranches.add(dn);
+ }
+ }
+
+ if(!ldifImportConfig.getIncludeBranches().isEmpty())
+ {
+ for(DN dn : ldifImportConfig.getIncludeBranches())
+ {
+ if(baseDN.isAncestorOf(dn))
+ {
+ includeBranches.add(dn);
+ }
+ }
+
+ if(includeBranches.isEmpty())
+ {
+ // There are no branches in the explicitly defined include list under
+ // this base DN. Skip this base DN alltogether.
+
+ return null;
+ }
+
+ // Remove any overlapping include branches.
+ for(DN includeDN : includeBranches)
+ {
+ boolean keep = true;
+ for(DN dn : includeBranches)
+ {
+ if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
+ {
+ keep = false;
+ break;
+ }
+ }
+ if(!keep)
+ {
+ includeBranches.remove(includeDN);
+ }
+ }
+
+ // Remvoe any exclude branches that are not are not under a include
+ // branch since they will be migrated as part of the existing entries
+ // outside of the include branches anyways.
+ for(DN excludeDN : excludeBranches)
+ {
+ boolean keep = false;
+ for(DN includeDN : includeBranches)
+ {
+ if(includeDN.isAncestorOf(excludeDN))
+ {
+ keep = true;
+ break;
+ }
+ }
+ if(!keep)
+ {
+ excludeBranches.remove(excludeDN);
+ }
+ }
+
+ if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
+ includeBranches.get(0).equals(baseDN))
+ {
+ // This entire base DN is explicitly included in the import with
+ // no exclude branches that we need to migrate. Just clear the entry
+ // container.
+ entryContainer.exclusiveLock.lock();
+ entryContainer.clear();
+ entryContainer.exclusiveLock.unlock();
+ }
+ else
+ {
+ // Create a temp entry container
+ srcEntryContainer = entryContainer;
+ entryContainer =
+ rootContainer.openEntryContainer(baseDN,
+ baseDN.toNormalizedString() +
+ "_importTmp");
+ }
+ }
+ }
+
+ // Create an import context.
+ ImportContext importContext = new ImportContext();
+ importContext.setBufferSize(bufferSize);
+ importContext.setConfig(config);
+ importContext.setLDIFImportConfig(this.ldifImportConfig);
+ importContext.setLDIFReader(reader);
+
+ importContext.setBaseDN(baseDN);
+ importContext.setEntryContainer(entryContainer);
+ importContext.setSrcEntryContainer(srcEntryContainer);
+ importContext.setBufferSize(bufferSize);
+
+ // Create an entry queue.
+ LinkedBlockingQueue<Entry> queue =
+ new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
+ importContext.setQueue(queue);
+
+ // Set the include and exclude branches
+ importContext.setIncludeBranches(includeBranches);
+ importContext.setExcludeBranches(excludeBranches);
+
+ return importContext;
+ }
+
/**
* This class reports progress of the import job at fixed intervals.
*/
@@ -841,7 +1185,7 @@
*/
public void run()
{
- long latestCount = reader.getEntriesRead();
+ long latestCount = reader.getEntriesRead() + migratedCount;
long deltaCount = (latestCount - previousCount);
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
@@ -858,7 +1202,7 @@
int msgID = MSGID_JEB_IMPORT_PROGRESS_REPORT;
String message = getMessage(msgID, numRead, numIgnored, numRejected,
- rate);
+ migratedCount, rate);
logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
message, msgID);
--
Gitblit v1.10.0