From 68cc46a7f02167dcfe7dc92a8a2b96cebf125ed9 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Thu, 27 Mar 2008 04:41:22 +0000
Subject: [PATCH] Fix for issue 3083 ConcurrentModificationException import abort.
---
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java | 72 +++++++++++++++++++++++++++++++-----
1 files changed, 62 insertions(+), 10 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
index e6a42ca..65e16f9 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,6 +39,9 @@
import org.opends.messages.Message;
import static org.opends.messages.JebMessages.*;
import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
/**
@@ -50,6 +53,18 @@
public class BufferManager {
+ final static int MIN_FLUSH_THREAD_NUM = 2;
+
+ //Lock used by the flush condition.
+ final Lock lock = new ReentrantLock();
+
+ //Used to block flush threads until all have completed any work on the
+ //element map.
+ final Condition flushCond = lock.newCondition();
+
+ //Number of flush flushWaiters waiting to flush.
+ private int flushWaiters = 0;
+
//Memory usage counter.
private long memoryUsage=0;
@@ -75,8 +90,8 @@
private final static int TREEMAP_ENTRY_OVERHEAD = 29;
private final static int KEY_ELEMENT_OVERHEAD = 28;
- //Import worker thread count.
- private int importThreadCount;
+ //Count of number of worker threads allowed to flush at the end of the run.
+ private int flushThreadNumber;
//Used to prevent memory flush
private boolean limitFlush = true;
@@ -91,7 +106,12 @@
public BufferManager(long memoryLimit, int importThreadCount) {
this.memoryLimit = memoryLimit;
this.nextElem = null;
- this.importThreadCount = importThreadCount;
+ //This limits the number of flush threads to 10 or less.
+ if(importThreadCount > MIN_FLUSH_THREAD_NUM)
+ this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
+ else
+ this.flushThreadNumber = importThreadCount;
+System.out.println("Num: " + flushThreadNumber);
}
/**
@@ -184,22 +204,54 @@
}
/**
- * Writes all of the buffer elements to DB. Thread id 0 always peforms the
- * flushing.
+ * Writes all of the buffer elements to DB. The specific id is used to
+ * share the buffer among the worker threads so this function can be
+ * multi-threaded.
*
* @param id The thread id.
* @throws DatabaseException If an error occurred during the insert.
+ * @throws InterruptedException If a thread has been interrupted.
*/
- void flushAll(int id) throws DatabaseException {
- if(id != 0) {
+ void flushAll(int id) throws DatabaseException, InterruptedException {
+ //If the thread ID is greater than the flush thread max return.
+ if(id > flushThreadNumber) {
return;
+ } else if (flushThreadNumber > 1) {
+ waitToFlush();
}
TreeSet<KeyHashElement> tSet =
new TreeSet<KeyHashElement>(elementMap.keySet());
- for (KeyHashElement curElem : tSet) {
+ Iterator<KeyHashElement> iter = tSet.iterator();
+ int i=0;
+ while(iter.hasNext()) {
+ KeyHashElement curElem = iter.next();
Index index = curElem.getIndex();
- index.insert(null, new DatabaseEntry(curElem.getKey()),
- curElem.getIDSet());
+ //Each thread handles a piece of the buffer based on its thread id.
+ if((i % flushThreadNumber) == id) {
+ index.insert(null, new DatabaseEntry(curElem.getKey()),
+ curElem.getIDSet());
+ }
+ i++;
+ }
+ }
+
+ /**
+ * Make the threads that are going to flush wait until all threads have
+ * completed inserts into the element map. This prevents concurrency
+ * exceptions, especially if the import has been configured for a large
+ * number of threads.
+ *
+ * @throws InterruptedException If the threads are interrupted.
+ */
+ private void waitToFlush() throws InterruptedException {
+ lock.lock();
+ try {
+ if(flushWaiters++ < flushThreadNumber)
+ flushCond.await();
+ else
+ flushCond.signalAll();
+ } finally {
+ lock.unlock();
}
}
--
Gitblit v1.10.0