From 68cc46a7f02167dcfe7dc92a8a2b96cebf125ed9 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Thu, 27 Mar 2008 04:41:22 +0000
Subject: [PATCH] Fix for issue 3083 ConcurrentModificationException import abort.

---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java |   72 +++++++++++++++++++++++++++++++-----
 1 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
index e6a42ca..65e16f9 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,6 +39,9 @@
 import org.opends.messages.Message;
 import static org.opends.messages.JebMessages.*;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
 
 
 /**
@@ -50,6 +53,18 @@
 
 public class BufferManager {
 
+  final static int MIN_FLUSH_THREAD_NUM = 2;
+
+  //Lock used by the flush condition.
+  final Lock lock = new ReentrantLock();
+
+  //Used to block flush threads until all have completed any work on the
+  //element map.
+  final Condition flushCond = lock.newCondition();
+
+  //Number of flush flushWaiters waiting to flush.
+  private int flushWaiters = 0;
+
   //Memory usage counter.
   private long memoryUsage=0;
 
@@ -75,8 +90,8 @@
   private final static int TREEMAP_ENTRY_OVERHEAD = 29;
   private final static int KEY_ELEMENT_OVERHEAD = 28;
 
-  //Import worker thread count.
-  private int importThreadCount;
+  //Count of number of worker threads allowed to flush at the end of the run.
+  private int flushThreadNumber;
 
   //Used to prevent memory flush
   private boolean limitFlush = true;
@@ -91,7 +106,12 @@
   public BufferManager(long memoryLimit, int importThreadCount) {
     this.memoryLimit = memoryLimit;
     this.nextElem = null;
-    this.importThreadCount = importThreadCount;
+    //This limits the number of flush threads to 10 or less.
+    if(importThreadCount > MIN_FLUSH_THREAD_NUM)
+     this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
+    else
+      this.flushThreadNumber = importThreadCount;
+System.out.println("Num: " + flushThreadNumber);
   }
 
   /**
@@ -184,22 +204,54 @@
   }
 
   /**
-   * Writes all of the buffer elements to DB. Thread id 0 always peforms the
-   * flushing.
+   * Writes all of the buffer elements to DB. The specific id is used to
+   * share the buffer among the worker threads so this function can be
+   * multi-threaded.
    *
    * @param id The thread id.
    * @throws DatabaseException If an error occurred during the insert.
+   * @throws InterruptedException If a thread has been interrupted.
    */
-  void flushAll(int id) throws DatabaseException {
-    if(id != 0) {
+  void flushAll(int id) throws DatabaseException, InterruptedException {
+    //If the thread ID is greater than the flush thread max return.
+    if(id > flushThreadNumber) {
       return;
+    } else if (flushThreadNumber > 1) {
+       waitToFlush();
     }
     TreeSet<KeyHashElement>  tSet =
             new TreeSet<KeyHashElement>(elementMap.keySet());
-    for (KeyHashElement curElem : tSet) {
+    Iterator<KeyHashElement> iter = tSet.iterator();
+    int i=0;
+    while(iter.hasNext()) {
+      KeyHashElement curElem = iter.next();
       Index index = curElem.getIndex();
-      index.insert(null, new DatabaseEntry(curElem.getKey()),
-              curElem.getIDSet());
+      //Each thread handles a piece of the buffer based on its thread id.
+      if((i % flushThreadNumber) == id) {
+        index.insert(null, new DatabaseEntry(curElem.getKey()),
+                curElem.getIDSet());
+      }
+      i++;
+    }
+  }
+
+  /**
+   * Make the threads that are going to flush wait until all threads have
+   * completed inserts into the element map. This prevents concurrency
+   * exceptions, especially if the import has been configured for a large
+   * number of threads.
+   *
+   * @throws InterruptedException  If the threads are interrupted.
+   */
+  private void waitToFlush() throws InterruptedException {
+    lock.lock();
+    try {
+      if(flushWaiters++ < flushThreadNumber)
+        flushCond.await();
+      else
+        flushCond.signalAll();
+    } finally {
+      lock.unlock();
     }
   }
 

--
Gitblit v1.10.0