| | |
| | | import org.opends.messages.Message; |
| | | import static org.opends.messages.JebMessages.*; |
| | | import java.util.*; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.concurrent.locks.Condition; |
| | | |
| | | |
| | | /** |
| | |
| | | |
| | | public class BufferManager { |
| | | |
| | | final static int MIN_FLUSH_THREAD_NUM = 2; |
| | | |
| | | //Lock used by the flush condition. |
| | | final Lock lock = new ReentrantLock(); |
| | | |
| | | //Used to block flush threads until all have completed any work on the |
| | | //element map. |
| | | final Condition flushCond = lock.newCondition(); |
| | | |
| | | //Number of flush flushWaiters waiting to flush. |
| | | private int flushWaiters = 0; |
| | | |
| | | //Memory usage counter. |
| | | private long memoryUsage=0; |
| | | |
| | |
| | | private final static int TREEMAP_ENTRY_OVERHEAD = 29; |
| | | private final static int KEY_ELEMENT_OVERHEAD = 28; |
| | | |
| | | //Import worker thread count. |
| | | private int importThreadCount; |
| | | //Count of number of worker threads allowed to flush at the end of the run. |
| | | private int flushThreadNumber; |
| | | |
| | | //Used to prevent memory flush |
| | | private boolean limitFlush = true; |
| | |
| | | public BufferManager(long memoryLimit, int importThreadCount) { |
| | | this.memoryLimit = memoryLimit; |
| | | this.nextElem = null; |
| | | this.importThreadCount = importThreadCount; |
| | | //This limits the number of flush threads to 10 or less. |
| | | if(importThreadCount > MIN_FLUSH_THREAD_NUM) |
| | | this.flushThreadNumber = MIN_FLUSH_THREAD_NUM; |
| | | else |
| | | this.flushThreadNumber = importThreadCount; |
| | | System.out.println("Num: " + flushThreadNumber); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Writes all of the buffer elements to DB. Thread id 0 always peforms the |
| | | * flushing. |
| | | * Writes all of the buffer elements to DB. The specific id is used to |
| | | * share the buffer among the worker threads so this function can be |
| | | * multi-threaded. |
| | | * |
| | | * @param id The thread id. |
| | | * @throws DatabaseException If an error occurred during the insert. |
| | | * @throws InterruptedException If a thread has been interrupted. |
| | | */ |
| | | void flushAll(int id) throws DatabaseException { |
| | | if(id != 0) { |
| | | void flushAll(int id) throws DatabaseException, InterruptedException { |
| | | //If the thread ID is greater than the flush thread max return. |
| | | if(id > flushThreadNumber) { |
| | | return; |
| | | } else if (flushThreadNumber > 1) { |
| | | waitToFlush(); |
| | | } |
| | | TreeSet<KeyHashElement> tSet = |
| | | new TreeSet<KeyHashElement>(elementMap.keySet()); |
| | | for (KeyHashElement curElem : tSet) { |
| | | Iterator<KeyHashElement> iter = tSet.iterator(); |
| | | int i=0; |
| | | while(iter.hasNext()) { |
| | | KeyHashElement curElem = iter.next(); |
| | | Index index = curElem.getIndex(); |
| | | index.insert(null, new DatabaseEntry(curElem.getKey()), |
| | | curElem.getIDSet()); |
| | | //Each thread handles a piece of the buffer based on its thread id. |
| | | if((i % flushThreadNumber) == id) { |
| | | index.insert(null, new DatabaseEntry(curElem.getKey()), |
| | | curElem.getIDSet()); |
| | | } |
| | | i++; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Make the threads that are going to flush wait until all threads have |
| | | * completed inserts into the element map. This prevents concurrency |
| | | * exceptions, especially if the import has been configured for a large |
| | | * number of threads. |
| | | * |
| | | * @throws InterruptedException If the threads are interrupted. |
| | | */ |
| | | private void waitToFlush() throws InterruptedException { |
| | | lock.lock(); |
| | | try { |
| | | if(flushWaiters++ < flushThreadNumber) |
| | | flushCond.await(); |
| | | else |
| | | flushCond.signalAll(); |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | } |
| | | |