From 98b7ac3c69f631226e11dbd242025b49a811ea10 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Tue, 01 Apr 2008 11:34:19 +0000
Subject: [PATCH] Fix occansional null pointer exception when an ancestor dn hasn't yet been added to dn2id by another work thread. Also simplify buffer flushing at end of the import. Issue 3083.

---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java    |   43 ++++++++-----
 opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java      |   19 ++---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java |   78 ++-----------------------
 3 files changed, 41 insertions(+), 99 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
index 16afb2e..1835c05 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,9 +39,6 @@
 import org.opends.messages.Message;
 import static org.opends.messages.JebMessages.*;
 import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
 
 
 /**
@@ -53,18 +50,6 @@
 
 public class BufferManager {
 
-  final static int MIN_FLUSH_THREAD_NUM = 2;
-
-  //Lock used by the flush condition.
-  final Lock lock = new ReentrantLock();
-
-  //Used to block flush threads until all have completed any work on the
-  //element map.
-  final Condition flushCond = lock.newCondition();
-
-  //Number of flush flushWaiters waiting to flush.
-  private int flushWaiters = 0;
-
   //Memory usage counter.
   private long memoryUsage=0;
 
@@ -90,12 +75,6 @@
   private final static int TREEMAP_ENTRY_OVERHEAD = 29;
   private final static int KEY_ELEMENT_OVERHEAD = 28;
 
-  //Count of number of worker threads allowed to flush at the end of the run.
-  private int flushThreadNumber;
-
-  //Used to prevent memory flush
-  private boolean limitFlush = true;
-
 
   /**
    * Create buffer manager instance.
@@ -106,11 +85,6 @@
   public BufferManager(long memoryLimit, int importThreadCount) {
     this.memoryLimit = memoryLimit;
     this.nextElem = null;
-    //This limits the number of flush threads to 10 or less.
-    if(importThreadCount > MIN_FLUSH_THREAD_NUM)
-     this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
-    else
-      this.flushThreadNumber = importThreadCount;
   }
 
   /**
@@ -150,7 +124,7 @@
        }
        //If over the memory limit and import hasn't completed
       //flush some keys from the cache to make room.
-       if((memoryUsage > memoryLimit) && limitFlush) {
+       if(memoryUsage > memoryLimit) {
          flushUntilUnderLimit();
        }
     }
@@ -168,7 +142,7 @@
     } else {
       iter = elementMap.tailMap(nextElem).keySet().iterator();
     }
-    while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) {
+    while((memoryUsage + extraBytes) > memoryLimit) {
       if(iter.hasNext()) {
         KeyHashElement curElem = iter.next();
         //Never flush undefined elements.
@@ -177,9 +151,7 @@
           index.insert(null, new DatabaseEntry(curElem.getKey()),
                   curElem.getIDSet());
           memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
-          if(limitFlush) {
-            iter.remove();
-          }
+          iter.remove();
         }
       } else {
         //Wrapped around, start at the first element.
@@ -196,7 +168,6 @@
    * ldif load.
    */
   void prepareFlush() {
-    limitFlush=false;
     Message msg =
            NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
     logError(msg);
@@ -207,50 +178,15 @@
    * share the buffer among the worker threads so this function can be
    * multi-threaded.
    *
-   * @param id The thread id.
    * @throws DatabaseException If an error occurred during the insert.
-   * @throws InterruptedException If a thread has been interrupted.
    */
-  void flushAll(int id) throws DatabaseException, InterruptedException {
-    //If the thread ID is greater than the flush thread max return.
-    if(id > flushThreadNumber) {
-      return;
-    } else if (flushThreadNumber > 1) {
-       waitToFlush();
-    }
+  void flushAll() throws DatabaseException {
     TreeSet<KeyHashElement>  tSet =
             new TreeSet<KeyHashElement>(elementMap.keySet());
-    Iterator<KeyHashElement> iter = tSet.iterator();
-    int i=0;
-    while(iter.hasNext()) {
-      KeyHashElement curElem = iter.next();
+    for (KeyHashElement curElem : tSet) {
       Index index = curElem.getIndex();
-      //Each thread handles a piece of the buffer based on its thread id.
-      if((i % flushThreadNumber) == id) {
-        index.insert(null, new DatabaseEntry(curElem.getKey()),
-                curElem.getIDSet());
-      }
-      i++;
-    }
-  }
-
-  /**
-   * Make the threads that are going to flush wait until all threads have
-   * completed inserts into the element map. This prevents concurrency
-   * exceptions, especially if the import has been configured for a large
-   * number of threads.
-   *
-   * @throws InterruptedException  If the threads are interrupted.
-   */
-  private void waitToFlush() throws InterruptedException {
-    lock.lock();
-    try {
-      if(flushWaiters++ < flushThreadNumber)
-        flushCond.await();
-      else
-        flushCond.signalAll();
-    } finally {
-      lock.unlock();
+      index.insert(null, new DatabaseEntry(curElem.getKey()),
+              curElem.getIDSet());
     }
   }
 
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 85b9ebe..f3350ab 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -489,7 +489,7 @@
 
   private void abortImport() throws JebException {
     //Stop work threads telling them to skip substring flush.
-     stopWorkThreads(false, true);
+     stopWorkThreads(false);
      timer.cancel();
      Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
      throw new JebException(message);
@@ -498,17 +498,13 @@
   /**
    * Stop work threads.
    *
-   * @param flushBuffer Flag telling threads that it should do substring flush.
    * @param abort <CODE>True</CODE> if stop work threads was called from an
    *              abort.
    * @throws JebException if a Jeb error occurs.
    */
   private void
-  stopWorkThreads(boolean flushBuffer, boolean abort) throws JebException {
+  stopWorkThreads(boolean abort) throws JebException {
     for (WorkThread t : threads) {
-      if(!flushBuffer) {
-        t.setFlush(false);
-      }
       t.stopProcessing();
     }
     // Wait for each thread to stop.
@@ -537,13 +533,14 @@
      Message msg;
     //Drain the work queue.
     drainWorkQueue();
-    //Prepare the buffer managers to flush.
-    for(DNContext context : importMap.values()) {
-      context.getBufferManager().prepareFlush();
-    }
     pTask.setPause(true);
     long startTime = System.currentTimeMillis();
-    stopWorkThreads(true, false);
+    stopWorkThreads(true);
+    //Flush the buffer managers.
+    for(DNContext context : importMap.values()) {
+      context.getBufferManager().prepareFlush();
+      context.getBufferManager().flushAll();
+    }
     long finishTime = System.currentTimeMillis();
     long flushTime = (finishTime - startTime) / 1000;
      msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
index df81ca7..10f6fed 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
@@ -76,9 +76,6 @@
   //The substring buffer manager to use.
   private BufferManager bufferMgr;
 
-  //Flag set when substring buffer should be flushed.
-  private boolean flushBuffer = true;
-
   /**
    * Create a work thread instance using the specified parameters.
    *
@@ -112,15 +109,6 @@
     stopRequested = true;
   }
 
-
-  /**
-   * Tells thread to flush substring buffer.
-   *
-   * @param flush Set to false if substring flush should be skipped.
-   */
-   void setFlush(boolean flush) {
-    this.flushBuffer = flush;
-  }
   /**
    * Run the thread. Read from item from queue and give it to the
    * buffer manage, unless told to stop. Once stopped, ask buffer manager
@@ -143,9 +131,6 @@
           }
         }
       } while (!stopRequested);
-      if(flushBuffer) {
-        bufferMgr.flushAll(threadNumber);
-      }
     } catch (Exception e) {
       if (debugEnabled()) {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -403,6 +388,7 @@
       IDs.set(0, entryID);
     }
     else {
+      EntryID nodeID;
       IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
       IDs.add(entryID);
       if (parentID != null)
@@ -411,8 +397,11 @@
         EntryContainer ec = context.getEntryContainer();
         for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
              dn = ec.getParentWithinBase(dn)) {
-          EntryID nodeID = dn2id.get(txn, dn);
-          IDs.add(nodeID);
+          if((nodeID =  getAncestorID(dn2id, dn, txn)) == null) {
+            return false;
+          } else {
+            IDs.add(nodeID);
+          }
         }
       }
     }
@@ -422,6 +411,26 @@
     return true;
   }
 
+  private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn)
+          throws DatabaseException {
+    int i=0;
+    EntryID nodeID = dn2id.get(txn, dn);
+    if(nodeID == null) {
+      while((nodeID = dn2id.get(txn, dn)) == null) {
+        try {
+          Thread.sleep(50);
+          if(i == 3) {
+            return null;
+          }
+          i++;
+        } catch (Exception e) {
+          return null;
+        }
+      }
+    }
+    return nodeID;
+  }
+
   /**
    * Process the a entry from the work element into the dn2id DB.
    *

--
Gitblit v1.10.0