mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

dugan
01.34.2008 98b7ac3c69f631226e11dbd242025b49a811ea10
opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -39,9 +39,6 @@
import org.opends.messages.Message;
import static org.opends.messages.JebMessages.*;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
/**
@@ -53,18 +50,6 @@
public class BufferManager {
  final static int MIN_FLUSH_THREAD_NUM = 2;
  //Lock used by the flush condition.
  final Lock lock = new ReentrantLock();
  //Used to block flush threads until all have completed any work on the
  //element map.
  final Condition flushCond = lock.newCondition();
  //Number of flush flushWaiters waiting to flush.
  private int flushWaiters = 0;
  //Memory usage counter.
  private long memoryUsage=0;
@@ -90,12 +75,6 @@
  private final static int TREEMAP_ENTRY_OVERHEAD = 29;
  private final static int KEY_ELEMENT_OVERHEAD = 28;
  //Count of number of worker threads allowed to flush at the end of the run.
  private int flushThreadNumber;
  //Used to prevent memory flush
  private boolean limitFlush = true;
  /**
   * Create buffer manager instance.
@@ -106,11 +85,6 @@
  public BufferManager(long memoryLimit, int importThreadCount) {
    this.memoryLimit = memoryLimit;
    this.nextElem = null;
    //This limits the number of flush threads to 10 or less.
    if(importThreadCount > MIN_FLUSH_THREAD_NUM)
     this.flushThreadNumber = MIN_FLUSH_THREAD_NUM;
    else
      this.flushThreadNumber = importThreadCount;
  }
  /**
@@ -150,7 +124,7 @@
       }
       //If over the memory limit and import hasn't completed
      //flush some keys from the cache to make room.
       if((memoryUsage > memoryLimit) && limitFlush) {
       if(memoryUsage > memoryLimit) {
         flushUntilUnderLimit();
       }
    }
@@ -168,7 +142,7 @@
    } else {
      iter = elementMap.tailMap(nextElem).keySet().iterator();
    }
    while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) {
    while((memoryUsage + extraBytes) > memoryLimit) {
      if(iter.hasNext()) {
        KeyHashElement curElem = iter.next();
        //Never flush undefined elements.
@@ -177,9 +151,7 @@
          index.insert(null, new DatabaseEntry(curElem.getKey()),
                  curElem.getIDSet());
          memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
          if(limitFlush) {
            iter.remove();
          }
          iter.remove();
        }
      } else {
        //Wrapped around, start at the first element.
@@ -196,7 +168,6 @@
   * ldif load.
   */
  void prepareFlush() {
    limitFlush=false;
    Message msg =
           NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
    logError(msg);
@@ -207,50 +178,15 @@
   * share the buffer among the worker threads so this function can be
   * multi-threaded.
   *
   * @param id The thread id.
   * @throws DatabaseException If an error occurred during the insert.
   * @throws InterruptedException If a thread has been interrupted.
   */
  void flushAll(int id) throws DatabaseException, InterruptedException {
    //If the thread ID is greater than the flush thread max return.
    if(id > flushThreadNumber) {
      return;
    } else if (flushThreadNumber > 1) {
       waitToFlush();
    }
  void flushAll() throws DatabaseException {
    TreeSet<KeyHashElement>  tSet =
            new TreeSet<KeyHashElement>(elementMap.keySet());
    Iterator<KeyHashElement> iter = tSet.iterator();
    int i=0;
    while(iter.hasNext()) {
      KeyHashElement curElem = iter.next();
    for (KeyHashElement curElem : tSet) {
      Index index = curElem.getIndex();
      //Each thread handles a piece of the buffer based on its thread id.
      if((i % flushThreadNumber) == id) {
        index.insert(null, new DatabaseEntry(curElem.getKey()),
                curElem.getIDSet());
      }
      i++;
    }
  }
  /**
   * Make the threads that are going to flush wait until all threads have
   * completed inserts into the element map. This prevents concurrency
   * exceptions, especially if the import has been configured for a large
   * number of threads.
   *
   * @throws InterruptedException  If the threads are interrupted.
   */
  private void waitToFlush() throws InterruptedException {
    lock.lock();
    try {
      if(flushWaiters++ < flushThreadNumber)
        flushCond.await();
      else
        flushCond.signalAll();
    } finally {
      lock.unlock();
      index.insert(null, new DatabaseEntry(curElem.getKey()),
              curElem.getIDSet());
    }
  }