From 31153ccec78ed6187b9543babccdaf996968d6bb Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Tue, 01 May 2007 17:20:27 +0000
Subject: [PATCH] Fix for issue #1262: database becomes empty when importing a not existing ldif file.
---
opends/src/server/org/opends/server/backends/jeb/ImportJob.java | 470 +++++++++++++++++++++++++++++-----------------------------
1 files changed, 233 insertions(+), 237 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/ImportJob.java b/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
index 6766fc2..02120ab 100644
--- a/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
+++ b/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -150,160 +150,175 @@
envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
envConfig.setConfigParam("je.log.fileMax", "100000000");
*/
- rootContainer = new RootContainer(config, backend);
- if (ldifImportConfig.appendToExistingData())
+
+ // Create an LDIF reader. Throws an exception if the file does not exist.
+ reader = new LDIFReader(ldifImportConfig);
+
+ int msgID;
+ String message;
+ long startTime;
+ try
{
- rootContainer.open(config.getBackendDirectory(),
- config.getBackendPermission(),
- false, true, true, true, true, false);
- }
- else
- {
- rootContainer.open(config.getBackendDirectory(),
- config.getBackendPermission(),
- false, true, false, false, false, false);
- }
+ rootContainer = new RootContainer(config, backend);
+ if (ldifImportConfig.appendToExistingData())
+ {
+ rootContainer.open(config.getBackendDirectory(),
+ config.getBackendPermission(),
+ false, true, true, true, true, false);
+ }
+ else
+ {
+ rootContainer.open(config.getBackendDirectory(),
+ config.getBackendPermission(),
+ false, true, false, false, false, false);
+ }
- if (!ldifImportConfig.appendToExistingData())
- {
- // We have the writer lock on the environment, now delete the
- // environment and re-open it. Only do this when we are
- // importing to all the base DNs in the backend.
- rootContainer.close();
- EnvManager.removeFiles(config.getBackendDirectory().getPath());
- rootContainer.open(config.getBackendDirectory(),
- config.getBackendPermission(),
- false, true, false, false, false, false);
- }
+ if (!ldifImportConfig.appendToExistingData())
+ {
+ // We have the writer lock on the environment, now delete the
+ // environment and re-open it. Only do this when we are
+ // importing to all the base DNs in the backend.
+ rootContainer.close();
+ EnvManager.removeFiles(config.getBackendDirectory().getPath());
+ rootContainer.open(config.getBackendDirectory(),
+ config.getBackendPermission(),
+ false, true, false, false, false, false);
+ }
- // Divide the total buffer size by the number of threads
- // and give that much to each thread.
- int importThreadCount = config.getImportThreadCount();
- long bufferSize = config.getImportBufferSize() /
- (importThreadCount*config.getBaseDNs().length);
+ // Divide the total buffer size by the number of threads
+ // and give that much to each thread.
+ int importThreadCount = config.getImportThreadCount();
+ long bufferSize = config.getImportBufferSize() /
+ (importThreadCount*config.getBaseDNs().length);
- int msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
- String message = getMessage(msgID, importThreadCount);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
+ msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
+ message = getMessage(msgID, importThreadCount);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
- msgID = MSGID_JEB_IMPORT_BUFFER_SIZE;
- message = getMessage(msgID, bufferSize);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
+ msgID = MSGID_JEB_IMPORT_BUFFER_SIZE;
+ message = getMessage(msgID, bufferSize);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
- msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG;
- message = getMessage(msgID,
- rootContainer.getEnvironmentConfig().toString());
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
+ msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG;
+ message = getMessage(msgID,
+ rootContainer.getEnvironmentConfig().toString());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
- DebugLogger.debugInfo(
+ DebugLogger.debugInfo(
rootContainer.getEnvironmentConfig().toString());
- rootContainer.openEntryContainers(config.getBaseDNs());
+ rootContainer.openEntryContainers(config.getBaseDNs());
- // Create the import contexts for each base DN.
- DN baseDN;
+ // Create the import contexts for each base DN.
+ DN baseDN;
- for (EntryContainer entryContainer : rootContainer.getEntryContainers())
- {
- baseDN = entryContainer.getBaseDN();
-
- // Create an import context.
- ImportContext importContext = new ImportContext();
- importContext.setBufferSize(bufferSize);
- importContext.setConfig(config);
- importContext.setLDIFImportConfig(this.ldifImportConfig);
-
- importContext.setBaseDN(baseDN);
- importContext.setContainerName(entryContainer.getContainerName());
- importContext.setEntryContainer(entryContainer);
- importContext.setBufferSize(bufferSize);
-
- // Create an entry queue.
- LinkedBlockingQueue<Entry> queue =
- new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
- importContext.setQueue(queue);
-
- importMap.put(baseDN, importContext);
- }
-
- // Make a note of the time we started.
- long startTime = System.currentTimeMillis();
-
- try
- {
- // Create a temporary work directory.
- File tempDir = new File(config.getImportTempDirectory());
- tempDir.mkdir();
- if (tempDir.listFiles() != null)
+ for (EntryContainer entryContainer : rootContainer.getEntryContainers())
{
- for (File f : tempDir.listFiles())
- {
- f.delete();
- }
+ baseDN = entryContainer.getBaseDN();
+
+ // Create an import context.
+ ImportContext importContext = new ImportContext();
+ importContext.setBufferSize(bufferSize);
+ importContext.setConfig(config);
+ importContext.setLDIFImportConfig(this.ldifImportConfig);
+ importContext.setLDIFReader(reader);
+
+ importContext.setBaseDN(baseDN);
+ importContext.setContainerName(entryContainer.getContainerName());
+ importContext.setEntryContainer(entryContainer);
+ importContext.setBufferSize(bufferSize);
+
+ // Create an entry queue.
+ LinkedBlockingQueue<Entry> queue =
+ new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
+ importContext.setQueue(queue);
+
+ importMap.put(baseDN, importContext);
}
+ // Make a note of the time we started.
+ startTime = System.currentTimeMillis();
+
try
{
- importedCount = 0;
- int passNumber = 1;
- boolean moreData = true;
- while (moreData)
+ // Create a temporary work directory.
+ File tempDir = new File(config.getImportTempDirectory());
+ tempDir.mkdir();
+ if (tempDir.listFiles() != null)
{
- moreData = processLDIF();
- if (moreData)
+ for (File f : tempDir.listFiles())
{
- msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
- message = getMessage(msgID, passNumber++);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
+ f.delete();
}
- else
+ }
+
+ try
+ {
+ importedCount = 0;
+ int passNumber = 1;
+ boolean moreData = true;
+ while (moreData)
{
- msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
- message = getMessage(msgID);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
- }
+ moreData = processLDIF();
+ if (moreData)
+ {
+ msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
+ message = getMessage(msgID, passNumber++);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+ else
+ {
+ msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
+ message = getMessage(msgID);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
- long mergeStartTime = System.currentTimeMillis();
- merge();
- long mergeEndTime = System.currentTimeMillis();
+ long mergeStartTime = System.currentTimeMillis();
+ merge();
+ long mergeEndTime = System.currentTimeMillis();
- if (moreData)
- {
- msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
- message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
+ if (moreData)
+ {
+ msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
+ message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+ else
+ {
+ msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
+ message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
}
- else
- {
- msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
- message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
- }
+ }
+ finally
+ {
+ tempDir.delete();
}
}
finally
{
- tempDir.delete();
+ rootContainer.close();
+
+ // Sync the environment to disk.
+ msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
+ message = getMessage(msgID);
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
+ message, msgID);
}
}
finally
{
- rootContainer.close();
-
- // Sync the environment to disk.
- msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
- message = getMessage(msgID);
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
- message, msgID);
+ reader.close();
}
long finishTime = System.currentTimeMillis();
@@ -438,14 +453,14 @@
}
/**
- * Open a reader for the LDIF file and create a set of worker threads, one
- * set for each base DN. Read each entry from the LDIF and determine which
+ * Create a set of worker threads, one set for each base DN.
+ * Read each entry from the LDIF and determine which
* base DN the entry belongs to. Write the dn2id database, then put the
* entry on the appropriate queue for the worker threads to consume.
* Record the entry count for each base DN when all entries have been
* processed.
*
- * @return true if is more data to be read from the LDIF file (the import
+ * @return true if thre is more data to be read from the LDIF file (the import
* pass size was reached), false if the entire LDIF file has been read.
*
* @throws JebException If an error occurs in the JE backend.
@@ -458,143 +473,124 @@
{
boolean moreData = false;
- // Create an LDIF reader if necessary.
- if (reader == null)
+ ArrayList<ImportThread> threads;
+
+ // Create one set of worker threads for each base DN.
+ int importThreadCount = config.getImportThreadCount();
+ threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
+ for (ImportContext ic : importMap.values())
{
- reader = new LDIFReader(ldifImportConfig);
- for (ImportContext ic : importMap.values())
+ for (int i = 0; i < importThreadCount; i++)
{
- ic.setLDIFReader(reader);
+ ImportThread t = new ImportThread(ic, i);
+ t.setUncaughtExceptionHandler(this);
+ threads.add(t);
+
+ t.start();
}
}
- ArrayList<ImportThread> threads;
try
{
- // Create one set of worker threads for each base DN.
- int importThreadCount = config.getImportThreadCount();
- threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
- for (ImportContext ic : importMap.values())
+ // Create a counter to use to determine whether we've hit the import
+ // pass size.
+ int entriesProcessed = 0;
+ int importPassSize = config.getImportPassSize();
+ if (importPassSize <= 0)
{
- for (int i = 0; i < importThreadCount; i++)
- {
- ImportThread t = new ImportThread(ic, i);
- t.setUncaughtExceptionHandler(this);
- threads.add(t);
-
- t.start();
- }
+ importPassSize = Integer.MAX_VALUE;
}
+ // Start a timer for the progress report.
+ Timer timer = new Timer();
+ TimerTask progressTask = new ImportJob.ProgressTask();
+ timer.scheduleAtFixedRate(progressTask, progressInterval,
+ progressInterval);
+
try
{
- // Create a counter to use to determine whether we've hit the import
- // pass size.
- int entriesProcessed = 0;
- int importPassSize = config.getImportPassSize();
- if (importPassSize <= 0)
- {
- importPassSize = Integer.MAX_VALUE;
- }
-
- // Start a timer for the progress report.
- Timer timer = new Timer();
- TimerTask progressTask = new ImportJob.ProgressTask();
- timer.scheduleAtFixedRate(progressTask, progressInterval,
- progressInterval);
-
- try
- {
- do
- {
- try
- {
- // Read the next entry.
- Entry entry = reader.readEntry();
-
- // Check for end of file.
- if (entry == null)
- {
- break;
- }
-
- // Route it according to base DN.
- ImportContext importContext = getImportConfig(entry.getDN());
-
- processEntry(importContext, entry);
-
- entriesProcessed++;
- if (entriesProcessed >= importPassSize)
- {
- moreData = true;
- break;
- }
- }
- catch (LDIFException e)
- {
- if (debugEnabled())
- {
- debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- catch (DirectoryException e)
- {
- if (debugEnabled())
- {
- debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- } while (true);
-
- // Wait for the queues to be drained.
- for (ImportContext ic : importMap.values())
- {
- while (ic.getQueue().size() > 0)
- {
- try
- {
- Thread.sleep(100);
- } catch (Exception e)
- {
- // No action needed.
- }
- }
- }
-
- }
- finally
- {
- timer.cancel();
- }
- }
- finally
- {
- // Order the threads to stop.
- for (ImportThread t : threads)
- {
- t.stopProcessing();
- }
-
- // Wait for each thread to stop.
- for (ImportThread t : threads)
+ do
{
try
{
- t.join();
- importedCount += t.getImportedCount();
+ // Read the next entry.
+ Entry entry = reader.readEntry();
+
+ // Check for end of file.
+ if (entry == null)
+ {
+ break;
+ }
+
+ // Route it according to base DN.
+ ImportContext importContext = getImportConfig(entry.getDN());
+
+ processEntry(importContext, entry);
+
+ entriesProcessed++;
+ if (entriesProcessed >= importPassSize)
+ {
+ moreData = true;
+ break;
+ }
}
- catch (InterruptedException ie)
+ catch (LDIFException e)
{
- // No action needed?
+ if (debugEnabled())
+ {
+ debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ catch (DirectoryException e)
+ {
+ if (debugEnabled())
+ {
+ debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ } while (true);
+
+ // Wait for the queues to be drained.
+ for (ImportContext ic : importMap.values())
+ {
+ while (ic.getQueue().size() > 0)
+ {
+ try
+ {
+ Thread.sleep(100);
+ } catch (Exception e)
+ {
+ // No action needed.
+ }
}
}
+
+ }
+ finally
+ {
+ timer.cancel();
}
}
finally
{
- if (! moreData)
+ // Order the threads to stop.
+ for (ImportThread t : threads)
{
- reader.close();
+ t.stopProcessing();
+ }
+
+ // Wait for each thread to stop.
+ for (ImportThread t : threads)
+ {
+ try
+ {
+ t.join();
+ importedCount += t.getImportedCount();
+ }
+ catch (InterruptedException ie)
+ {
+ // No action needed?
+ }
}
}
--
Gitblit v1.10.0