| | |
| | | import org.opends.messages.Message; |
| | | import static org.opends.messages.JebMessages.*; |
| | | import java.util.*; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.concurrent.locks.Condition; |
| | | |
| | | |
| | | /** |
| | |
| | | |
| | | public class BufferManager { |
| | | |
| | | final static int MIN_FLUSH_THREAD_NUM = 2; |
| | | |
| | | //Lock used by the flush condition. |
| | | final Lock lock = new ReentrantLock(); |
| | | |
| | | //Used to block flush threads until all have completed any work on the |
| | | //element map. |
| | | final Condition flushCond = lock.newCondition(); |
| | | |
| | | //Number of flush flushWaiters waiting to flush. |
| | | private int flushWaiters = 0; |
| | | |
| | | //Memory usage counter. |
| | | private long memoryUsage=0; |
| | | |
| | |
| | | private final static int TREEMAP_ENTRY_OVERHEAD = 29; |
| | | private final static int KEY_ELEMENT_OVERHEAD = 28; |
| | | |
| | | //Count of number of worker threads allowed to flush at the end of the run. |
| | | private int flushThreadNumber; |
| | | |
| | | //Used to prevent memory flush |
| | | private boolean limitFlush = true; |
| | | |
| | | |
| | | /** |
| | | * Create buffer manager instance. |
| | |
| | | public BufferManager(long memoryLimit, int importThreadCount) { |
| | | this.memoryLimit = memoryLimit; |
| | | this.nextElem = null; |
| | | //This limits the number of flush threads to 10 or less. |
| | | if(importThreadCount > MIN_FLUSH_THREAD_NUM) |
| | | this.flushThreadNumber = MIN_FLUSH_THREAD_NUM; |
| | | else |
| | | this.flushThreadNumber = importThreadCount; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | //If over the memory limit and import hasn't completed |
| | | //flush some keys from the cache to make room. |
| | | if((memoryUsage > memoryLimit) && limitFlush) { |
| | | if(memoryUsage > memoryLimit) { |
| | | flushUntilUnderLimit(); |
| | | } |
| | | } |
| | |
| | | } else { |
| | | iter = elementMap.tailMap(nextElem).keySet().iterator(); |
| | | } |
| | | while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) { |
| | | while((memoryUsage + extraBytes) > memoryLimit) { |
| | | if(iter.hasNext()) { |
| | | KeyHashElement curElem = iter.next(); |
| | | //Never flush undefined elements. |
| | |
| | | index.insert(null, new DatabaseEntry(curElem.getKey()), |
| | | curElem.getIDSet()); |
| | | memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize(); |
| | | if(limitFlush) { |
| | | iter.remove(); |
| | | } |
| | | iter.remove(); |
| | | } |
| | | } else { |
| | | //Wrapped around, start at the first element. |
| | |
| | | * ldif load. |
| | | */ |
| | | void prepareFlush() { |
| | | limitFlush=false; |
| | | Message msg = |
| | | NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit); |
| | | logError(msg); |
| | |
| | | * share the buffer among the worker threads so this function can be |
| | | * multi-threaded. |
| | | * |
| | | * @param id The thread id. |
| | | * @throws DatabaseException If an error occurred during the insert. |
| | | * @throws InterruptedException If a thread has been interrupted. |
| | | */ |
| | | void flushAll(int id) throws DatabaseException, InterruptedException { |
| | | //If the thread ID is greater than the flush thread max return. |
| | | if(id > flushThreadNumber) { |
| | | return; |
| | | } else if (flushThreadNumber > 1) { |
| | | waitToFlush(); |
| | | } |
| | | void flushAll() throws DatabaseException { |
| | | TreeSet<KeyHashElement> tSet = |
| | | new TreeSet<KeyHashElement>(elementMap.keySet()); |
| | | Iterator<KeyHashElement> iter = tSet.iterator(); |
| | | int i=0; |
| | | while(iter.hasNext()) { |
| | | KeyHashElement curElem = iter.next(); |
| | | for (KeyHashElement curElem : tSet) { |
| | | Index index = curElem.getIndex(); |
| | | //Each thread handles a piece of the buffer based on its thread id. |
| | | if((i % flushThreadNumber) == id) { |
| | | index.insert(null, new DatabaseEntry(curElem.getKey()), |
| | | curElem.getIDSet()); |
| | | } |
| | | i++; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Make the threads that are going to flush wait until all threads have |
| | | * completed inserts into the element map. This prevents concurrency |
| | | * exceptions, especially if the import has been configured for a large |
| | | * number of threads. |
| | | * |
| | | * @throws InterruptedException If the threads are interrupted. |
| | | */ |
| | | private void waitToFlush() throws InterruptedException { |
| | | lock.lock(); |
| | | try { |
| | | if(flushWaiters++ < flushThreadNumber) |
| | | flushCond.await(); |
| | | else |
| | | flushCond.signalAll(); |
| | | } finally { |
| | | lock.unlock(); |
| | | index.insert(null, new DatabaseEntry(curElem.getKey()), |
| | | curElem.getIDSet()); |
| | | } |
| | | } |
| | | |