From 98b7ac3c69f631226e11dbd242025b49a811ea10 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Tue, 01 Apr 2008 11:34:19 +0000
Subject: [PATCH] Fix occansional null pointer exception when an ancestor dn hasn't yet been added to dn2id by another work thread. Also simplify buffer flushing at end of the import. Issue 3083.

---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java |   78 +++-----------------------------------
 1 files changed, 7 insertions(+), 71 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
index 16afb2e..1835c05 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,9 +39,6 @@
 import org.opends.messages.Message;
 import static org.opends.messages.JebMessages.*;
 import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
 
 
 /**
@@ -53,18 +50,6 @@
 
 public class BufferManager {
 
-  final static int MIN_FLUSH_THREAD_NUM = 2;
-
-  //Lock used by the flush condition.
-  final Lock lock = new ReentrantLock();
-
-  //Used to block flush threads until all have completed any work on the
-  //element map.
-  final Condition flushCond = lock.newCondition();
-
-  //Number of flush flushWaiters waiting to flush.
-  private int flushWaiters = 0;
-
   //Memory usage counter.
   private long memoryUsage=0;
 
@@ -90,12 +75,6 @@
   private final static int TREEMAP_ENTRY_OVERHEAD = 29;
   private final static int KEY_ELEMENT_OVERHEAD = 28;
 
-  //Count of number of worker threads allowed to flush at the end of the run.
-  private int flushThreadNumber;
-
-  //Used to prevent memory flush
-  private boolean limitFlush = true;
-
 
   /**
    * Create buffer manager instance.
@@ -106,11 +85,6 @@
   public BufferManager(long memoryLimit, int importThreadCount) {
     this.memoryLimit = memoryLimit;
     this.nextElem = null;
-    //This limits the number of flush threads to 10 or less.
-    if(importThreadCount > MIN_FLUSH_THREAD_NUM)
-     this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
-    else
-      this.flushThreadNumber = importThreadCount;
   }
 
   /**
@@ -150,7 +124,7 @@
        }
        //If over the memory limit and import hasn't completed
       //flush some keys from the cache to make room.
-       if((memoryUsage > memoryLimit) && limitFlush) {
+       if(memoryUsage > memoryLimit) {
          flushUntilUnderLimit();
        }
     }
@@ -168,7 +142,7 @@
     } else {
       iter = elementMap.tailMap(nextElem).keySet().iterator();
     }
-    while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) {
+    while((memoryUsage + extraBytes) > memoryLimit) {
       if(iter.hasNext()) {
         KeyHashElement curElem = iter.next();
         //Never flush undefined elements.
@@ -177,9 +151,7 @@
           index.insert(null, new DatabaseEntry(curElem.getKey()),
                   curElem.getIDSet());
           memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
-          if(limitFlush) {
-            iter.remove();
-          }
+          iter.remove();
         }
       } else {
         //Wrapped around, start at the first element.
@@ -196,7 +168,6 @@
    * ldif load.
    */
   void prepareFlush() {
-    limitFlush=false;
     Message msg =
            NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
     logError(msg);
@@ -207,50 +178,15 @@
    * share the buffer among the worker threads so this function can be
    * multi-threaded.
    *
-   * @param id The thread id.
    * @throws DatabaseException If an error occurred during the insert.
-   * @throws InterruptedException If a thread has been interrupted.
    */
-  void flushAll(int id) throws DatabaseException, InterruptedException {
-    //If the thread ID is greater than the flush thread max return.
-    if(id > flushThreadNumber) {
-      return;
-    } else if (flushThreadNumber > 1) {
-       waitToFlush();
-    }
+  void flushAll() throws DatabaseException {
     TreeSet<KeyHashElement>  tSet =
             new TreeSet<KeyHashElement>(elementMap.keySet());
-    Iterator<KeyHashElement> iter = tSet.iterator();
-    int i=0;
-    while(iter.hasNext()) {
-      KeyHashElement curElem = iter.next();
+    for (KeyHashElement curElem : tSet) {
       Index index = curElem.getIndex();
-      //Each thread handles a piece of the buffer based on its thread id.
-      if((i % flushThreadNumber) == id) {
-        index.insert(null, new DatabaseEntry(curElem.getKey()),
-                curElem.getIDSet());
-      }
-      i++;
-    }
-  }
-
-  /**
-   * Make the threads that are going to flush wait until all threads have
-   * completed inserts into the element map. This prevents concurrency
-   * exceptions, especially if the import has been configured for a large
-   * number of threads.
-   *
-   * @throws InterruptedException  If the threads are interrupted.
-   */
-  private void waitToFlush() throws InterruptedException {
-    lock.lock();
-    try {
-      if(flushWaiters++ < flushThreadNumber)
-        flushCond.await();
-      else
-        flushCond.signalAll();
-    } finally {
-      lock.unlock();
+      index.insert(null, new DatabaseEntry(curElem.getKey()),
+              curElem.getIDSet());
     }
   }
 

--
Gitblit v1.10.0