From 98b7ac3c69f631226e11dbd242025b49a811ea10 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Tue, 01 Apr 2008 11:34:19 +0000
Subject: [PATCH] Fix occansional null pointer exception when an ancestor dn hasn't yet been added to dn2id by another work thread. Also simplify buffer flushing at end of the import. Issue 3083.
---
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java | 43 ++++++++-----
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 19 ++---
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java | 78 ++-----------------------
3 files changed, 41 insertions(+), 99 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
index 16afb2e..1835c05 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,9 +39,6 @@
import org.opends.messages.Message;
import static org.opends.messages.JebMessages.*;
import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
/**
@@ -53,18 +50,6 @@
public class BufferManager {
- final static int MIN_FLUSH_THREAD_NUM = 2;
-
- //Lock used by the flush condition.
- final Lock lock = new ReentrantLock();
-
- //Used to block flush threads until all have completed any work on the
- //element map.
- final Condition flushCond = lock.newCondition();
-
- //Number of flush flushWaiters waiting to flush.
- private int flushWaiters = 0;
-
//Memory usage counter.
private long memoryUsage=0;
@@ -90,12 +75,6 @@
private final static int TREEMAP_ENTRY_OVERHEAD = 29;
private final static int KEY_ELEMENT_OVERHEAD = 28;
- //Count of number of worker threads allowed to flush at the end of the run.
- private int flushThreadNumber;
-
- //Used to prevent memory flush
- private boolean limitFlush = true;
-
/**
* Create buffer manager instance.
@@ -106,11 +85,6 @@
public BufferManager(long memoryLimit, int importThreadCount) {
this.memoryLimit = memoryLimit;
this.nextElem = null;
- //This limits the number of flush threads to 10 or less.
- if(importThreadCount > MIN_FLUSH_THREAD_NUM)
- this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
- else
- this.flushThreadNumber = importThreadCount;
}
/**
@@ -150,7 +124,7 @@
}
//If over the memory limit and import hasn't completed
//flush some keys from the cache to make room.
- if((memoryUsage > memoryLimit) && limitFlush) {
+ if(memoryUsage > memoryLimit) {
flushUntilUnderLimit();
}
}
@@ -168,7 +142,7 @@
} else {
iter = elementMap.tailMap(nextElem).keySet().iterator();
}
- while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) {
+ while((memoryUsage + extraBytes) > memoryLimit) {
if(iter.hasNext()) {
KeyHashElement curElem = iter.next();
//Never flush undefined elements.
@@ -177,9 +151,7 @@
index.insert(null, new DatabaseEntry(curElem.getKey()),
curElem.getIDSet());
memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
- if(limitFlush) {
- iter.remove();
- }
+ iter.remove();
}
} else {
//Wrapped around, start at the first element.
@@ -196,7 +168,6 @@
* ldif load.
*/
void prepareFlush() {
- limitFlush=false;
Message msg =
NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
logError(msg);
@@ -207,50 +178,15 @@
* share the buffer among the worker threads so this function can be
* multi-threaded.
*
- * @param id The thread id.
* @throws DatabaseException If an error occurred during the insert.
- * @throws InterruptedException If a thread has been interrupted.
*/
- void flushAll(int id) throws DatabaseException, InterruptedException {
- //If the thread ID is greater than the flush thread max return.
- if(id > flushThreadNumber) {
- return;
- } else if (flushThreadNumber > 1) {
- waitToFlush();
- }
+ void flushAll() throws DatabaseException {
TreeSet<KeyHashElement> tSet =
new TreeSet<KeyHashElement>(elementMap.keySet());
- Iterator<KeyHashElement> iter = tSet.iterator();
- int i=0;
- while(iter.hasNext()) {
- KeyHashElement curElem = iter.next();
+ for (KeyHashElement curElem : tSet) {
Index index = curElem.getIndex();
- //Each thread handles a piece of the buffer based on its thread id.
- if((i % flushThreadNumber) == id) {
- index.insert(null, new DatabaseEntry(curElem.getKey()),
- curElem.getIDSet());
- }
- i++;
- }
- }
-
- /**
- * Make the threads that are going to flush wait until all threads have
- * completed inserts into the element map. This prevents concurrency
- * exceptions, especially if the import has been configured for a large
- * number of threads.
- *
- * @throws InterruptedException If the threads are interrupted.
- */
- private void waitToFlush() throws InterruptedException {
- lock.lock();
- try {
- if(flushWaiters++ < flushThreadNumber)
- flushCond.await();
- else
- flushCond.signalAll();
- } finally {
- lock.unlock();
+ index.insert(null, new DatabaseEntry(curElem.getKey()),
+ curElem.getIDSet());
}
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 85b9ebe..f3350ab 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -489,7 +489,7 @@
private void abortImport() throws JebException {
//Stop work threads telling them to skip substring flush.
- stopWorkThreads(false, true);
+ stopWorkThreads(false);
timer.cancel();
Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
throw new JebException(message);
@@ -498,17 +498,13 @@
/**
* Stop work threads.
*
- * @param flushBuffer Flag telling threads that it should do substring flush.
* @param abort <CODE>True</CODE> if stop work threads was called from an
* abort.
* @throws JebException if a Jeb error occurs.
*/
private void
- stopWorkThreads(boolean flushBuffer, boolean abort) throws JebException {
+ stopWorkThreads(boolean abort) throws JebException {
for (WorkThread t : threads) {
- if(!flushBuffer) {
- t.setFlush(false);
- }
t.stopProcessing();
}
// Wait for each thread to stop.
@@ -537,13 +533,14 @@
Message msg;
//Drain the work queue.
drainWorkQueue();
- //Prepare the buffer managers to flush.
- for(DNContext context : importMap.values()) {
- context.getBufferManager().prepareFlush();
- }
pTask.setPause(true);
long startTime = System.currentTimeMillis();
- stopWorkThreads(true, false);
+ stopWorkThreads(true);
+ //Flush the buffer managers.
+ for(DNContext context : importMap.values()) {
+ context.getBufferManager().prepareFlush();
+ context.getBufferManager().flushAll();
+ }
long finishTime = System.currentTimeMillis();
long flushTime = (finishTime - startTime) / 1000;
msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
index df81ca7..10f6fed 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
@@ -76,9 +76,6 @@
//The substring buffer manager to use.
private BufferManager bufferMgr;
- //Flag set when substring buffer should be flushed.
- private boolean flushBuffer = true;
-
/**
* Create a work thread instance using the specified parameters.
*
@@ -112,15 +109,6 @@
stopRequested = true;
}
-
- /**
- * Tells thread to flush substring buffer.
- *
- * @param flush Set to false if substring flush should be skipped.
- */
- void setFlush(boolean flush) {
- this.flushBuffer = flush;
- }
/**
* Run the thread. Read from item from queue and give it to the
* buffer manage, unless told to stop. Once stopped, ask buffer manager
@@ -143,9 +131,6 @@
}
}
} while (!stopRequested);
- if(flushBuffer) {
- bufferMgr.flushAll(threadNumber);
- }
} catch (Exception e) {
if (debugEnabled()) {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -403,6 +388,7 @@
IDs.set(0, entryID);
}
else {
+ EntryID nodeID;
IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
IDs.add(entryID);
if (parentID != null)
@@ -411,8 +397,11 @@
EntryContainer ec = context.getEntryContainer();
for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
dn = ec.getParentWithinBase(dn)) {
- EntryID nodeID = dn2id.get(txn, dn);
- IDs.add(nodeID);
+ if((nodeID = getAncestorID(dn2id, dn, txn)) == null) {
+ return false;
+ } else {
+ IDs.add(nodeID);
+ }
}
}
}
@@ -422,6 +411,26 @@
return true;
}
+ private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn)
+ throws DatabaseException {
+ int i=0;
+ EntryID nodeID = dn2id.get(txn, dn);
+ if(nodeID == null) {
+ while((nodeID = dn2id.get(txn, dn)) == null) {
+ try {
+ Thread.sleep(50);
+ if(i == 3) {
+ return null;
+ }
+ i++;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }
+ return nodeID;
+ }
+
/**
* Process the a entry from the work element into the dn2id DB.
*
--
Gitblit v1.10.0