mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

dugan
01.34.2008 98b7ac3c69f631226e11dbd242025b49a811ea10
Fix occansional null pointer exception when an ancestor dn hasn't yet been added to dn2id by another work thread. Also simplify buffer flushing at end of the import. Issue 3083.
3 files modified
132 ■■■■ changed files
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java 72 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java 41 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,9 +39,6 @@
import org.opends.messages.Message;
import static org.opends.messages.JebMessages.*;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
/**
@@ -53,18 +50,6 @@
public class BufferManager {
  final static int MIN_FLUSH_THREAD_NUM = 2;
  //Lock used by the flush condition.
  final Lock lock = new ReentrantLock();
  //Used to block flush threads until all have completed any work on the
  //element map.
  final Condition flushCond = lock.newCondition();
  //Number of flush flushWaiters waiting to flush.
  private int flushWaiters = 0;
  //Memory usage counter.
  private long memoryUsage=0;
@@ -90,12 +75,6 @@
  private final static int TREEMAP_ENTRY_OVERHEAD = 29;
  private final static int KEY_ELEMENT_OVERHEAD = 28;
  //Count of number of worker threads allowed to flush at the end of the run.
  private int flushThreadNumber;
  //Used to prevent memory flush
  private boolean limitFlush = true;
  /**
   * Create buffer manager instance.
@@ -106,11 +85,6 @@
  public BufferManager(long memoryLimit, int importThreadCount) {
    this.memoryLimit = memoryLimit;
    this.nextElem = null;
    //This limits the number of flush threads to 10 or less.
    if(importThreadCount > MIN_FLUSH_THREAD_NUM)
     this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
    else
      this.flushThreadNumber = importThreadCount;
  }
  /**
@@ -150,7 +124,7 @@
       }
       //If over the memory limit and import hasn't completed
      //flush some keys from the cache to make room.
       if((memoryUsage > memoryLimit) && limitFlush) {
       if(memoryUsage > memoryLimit) {
         flushUntilUnderLimit();
       }
    }
@@ -168,7 +142,7 @@
    } else {
      iter = elementMap.tailMap(nextElem).keySet().iterator();
    }
    while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) {
    while((memoryUsage + extraBytes) > memoryLimit) {
      if(iter.hasNext()) {
        KeyHashElement curElem = iter.next();
        //Never flush undefined elements.
@@ -177,10 +151,8 @@
          index.insert(null, new DatabaseEntry(curElem.getKey()),
                  curElem.getIDSet());
          memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
          if(limitFlush) {
            iter.remove();
          }
        }
      } else {
        //Wrapped around, start at the first element.
        nextElem = elementMap.firstKey();
@@ -196,7 +168,6 @@
   * ldif load.
   */
  void prepareFlush() {
    limitFlush=false;
    Message msg =
           NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
    logError(msg);
@@ -207,51 +178,16 @@
   * share the buffer among the worker threads so this function can be
   * multi-threaded.
   *
   * @param id The thread id.
   * @throws DatabaseException If an error occurred during the insert.
   * @throws InterruptedException If a thread has been interrupted.
   */
  void flushAll(int id) throws DatabaseException, InterruptedException {
    //If the thread ID is greater than the flush thread max return.
    if(id > flushThreadNumber) {
      return;
    } else if (flushThreadNumber > 1) {
       waitToFlush();
    }
  void flushAll() throws DatabaseException {
    TreeSet<KeyHashElement>  tSet =
            new TreeSet<KeyHashElement>(elementMap.keySet());
    Iterator<KeyHashElement> iter = tSet.iterator();
    int i=0;
    while(iter.hasNext()) {
      KeyHashElement curElem = iter.next();
    for (KeyHashElement curElem : tSet) {
      Index index = curElem.getIndex();
      //Each thread handles a piece of the buffer based on its thread id.
      if((i % flushThreadNumber) == id) {
        index.insert(null, new DatabaseEntry(curElem.getKey()),
                curElem.getIDSet());
      }
      i++;
    }
  }
  /**
   * Make the threads that are going to flush wait until all threads have
   * completed inserts into the element map. This prevents concurrency
   * exceptions, especially if the import has been configured for a large
   * number of threads.
   *
   * @throws InterruptedException  If the threads are interrupted.
   */
  private void waitToFlush() throws InterruptedException {
    lock.lock();
    try {
      if(flushWaiters++ < flushThreadNumber)
        flushCond.await();
      else
        flushCond.signalAll();
    } finally {
      lock.unlock();
    }
  }
  /**
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -489,7 +489,7 @@
  private void abortImport() throws JebException {
    //Stop work threads telling them to skip substring flush.
     stopWorkThreads(false, true);
     stopWorkThreads(false);
     timer.cancel();
     Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
     throw new JebException(message);
@@ -498,17 +498,13 @@
  /**
   * Stop work threads.
   *
   * @param flushBuffer Flag telling threads that it should do substring flush.
   * @param abort <CODE>True</CODE> if stop work threads was called from an
   *              abort.
   * @throws JebException if a Jeb error occurs.
   */
  private void
  stopWorkThreads(boolean flushBuffer, boolean abort) throws JebException {
  stopWorkThreads(boolean abort) throws JebException {
    for (WorkThread t : threads) {
      if(!flushBuffer) {
        t.setFlush(false);
      }
      t.stopProcessing();
    }
    // Wait for each thread to stop.
@@ -537,13 +533,14 @@
     Message msg;
    //Drain the work queue.
    drainWorkQueue();
    //Prepare the buffer managers to flush.
    for(DNContext context : importMap.values()) {
      context.getBufferManager().prepareFlush();
    }
    pTask.setPause(true);
    long startTime = System.currentTimeMillis();
    stopWorkThreads(true, false);
    stopWorkThreads(true);
    //Flush the buffer managers.
    for(DNContext context : importMap.values()) {
      context.getBufferManager().prepareFlush();
      context.getBufferManager().flushAll();
    }
    long finishTime = System.currentTimeMillis();
    long flushTime = (finishTime - startTime) / 1000;
     msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
@@ -76,9 +76,6 @@
  //The substring buffer manager to use.
  private BufferManager bufferMgr;
  //Flag set when substring buffer should be flushed.
  private boolean flushBuffer = true;
  /**
   * Create a work thread instance using the specified parameters.
   *
@@ -112,15 +109,6 @@
    stopRequested = true;
  }
  /**
   * Tells thread to flush substring buffer.
   *
   * @param flush Set to false if substring flush should be skipped.
   */
   void setFlush(boolean flush) {
    this.flushBuffer = flush;
  }
  /**
   * Run the thread. Read from item from queue and give it to the
   * buffer manage, unless told to stop. Once stopped, ask buffer manager
@@ -143,9 +131,6 @@
          }
        }
      } while (!stopRequested);
      if(flushBuffer) {
        bufferMgr.flushAll(threadNumber);
      }
    } catch (Exception e) {
      if (debugEnabled()) {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -403,6 +388,7 @@
      IDs.set(0, entryID);
    }
    else {
      EntryID nodeID;
      IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
      IDs.add(entryID);
      if (parentID != null)
@@ -411,17 +397,40 @@
        EntryContainer ec = context.getEntryContainer();
        for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
             dn = ec.getParentWithinBase(dn)) {
          EntryID nodeID = dn2id.get(txn, dn);
          if((nodeID =  getAncestorID(dn2id, dn, txn)) == null) {
            return false;
          } else {
          IDs.add(nodeID);
        }
      }
    }
    }
    context.setParentDN(parentDN);
    context.setIDs(IDs);
    entry.setAttachment(IDs);
    return true;
  }
  private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn)
          throws DatabaseException {
    int i=0;
    EntryID nodeID = dn2id.get(txn, dn);
    if(nodeID == null) {
      while((nodeID = dn2id.get(txn, dn)) == null) {
        try {
          Thread.sleep(50);
          if(i == 3) {
            return null;
          }
          i++;
        } catch (Exception e) {
          return null;
        }
      }
    }
    return nodeID;
  }
  /**
   * Process the a entry from the work element into the dn2id DB.
   *