| | |
| | | |
| | | import java.io.*; |
| | | import java.nio.ByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | | import java.util.*; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | |
| | | final int limit = Math.min(dbThreads, totList.size()); |
| | | for (int i = 0; i < limit; i++) |
| | | { |
| | | buffers += totList.get(i).bufferIndexCount; |
| | | buffers += totList.get(i).numberOfBuffers; |
| | | } |
| | | |
| | | readAheadSize = (int) (usableMemory / buffers); |
| | |
| | | } |
| | | else |
| | | { |
| | | // Not enough memory. |
| | | final long minimumPhaseTwoBufferMemory = buffers |
| | | * MIN_READ_AHEAD_CACHE_SIZE; |
| | | Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory, |
| | | minimumPhaseTwoBufferMemory + dbCacheSize); |
| | | throw new InitializationException(message); |
| | | // Not enough memory - will need to do batching for the biggest indexes. |
| | | readAheadSize = MIN_READ_AHEAD_CACHE_SIZE; |
| | | buffers = (int) (usableMemory / readAheadSize); |
| | | |
| | | Message message = WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO.get(usableMemory); |
| | | logError(message); |
| | | break; |
| | | } |
| | | } |
| | | |
| | | // Ensure that there are always two threads available for parallel |
| | | // processing of smaller indexes. |
| | | dbThreads = Math.max(2, dbThreads); |
| | | |
| | | Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get( |
| | | availableMemory, readAheadSize, buffers); |
| | | logError(message); |
| | |
| | | // Start indexing tasks. |
| | | List<Future<Void>> futures = new LinkedList<Future<Void>>(); |
| | | ExecutorService dbService = Executors.newFixedThreadPool(dbThreads); |
| | | Semaphore permits = new Semaphore(buffers); |
| | | |
| | | // Start DN processing first. |
| | | for (IndexManager dnMgr : DNIndexMgrList) |
| | | { |
| | | futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize))); |
| | | futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits, |
| | | buffers, readAheadSize))); |
| | | } |
| | | for (IndexManager mgr : indexMgrList) |
| | | { |
| | | futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize))); |
| | | futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers, |
| | | readAheadSize))); |
| | | } |
| | | |
| | | for (Future<Void> result : futures) |
| | |
| | | private final DatabaseEntry dbKey, dbValue; |
| | | private final int cacheSize; |
| | | private final Map<Integer, DNState> dnStateMap = |
| | | new HashMap<Integer, DNState>(); |
| | | new HashMap<Integer, DNState>(); |
| | | private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>(); |
| | | private final Semaphore permits; |
| | | private final int maxPermits; |
| | | private final AtomicLong bytesRead = new AtomicLong(); |
| | | private long lastBytesRead = 0; |
| | | private final AtomicInteger keyCount = new AtomicInteger(); |
| | | private RandomAccessFile bufferFile = null; |
| | | private DataInputStream bufferIndexFile = null; |
| | | private int remainingBuffers; |
| | | private volatile int totalBatches; |
| | | private AtomicInteger batchNumber = new AtomicInteger(); |
| | | private int nextBufferID; |
| | | private int ownedPermits; |
| | | private volatile boolean isRunning = false; |
| | | |
| | | |
| | | public IndexDBWriteTask(IndexManager indexMgr, int cacheSize) |
| | | /** |
| | | * Creates a new index DB writer. |
| | | * |
| | | * @param indexMgr |
| | | * The index manager. |
| | | * @param permits |
| | | * The semaphore used for restricting the number of buffer |
| | | * allocations. |
| | | * @param maxPermits |
| | | * The maximum number of buffers which can be allocated. |
| | | * @param cacheSize |
| | | * The buffer cache size. |
| | | */ |
| | | public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits, |
| | | int maxPermits, int cacheSize) |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | this.permits = permits; |
| | | this.maxPermits = maxPermits; |
| | | this.cacheSize = cacheSize; |
| | | |
| | | this.dbKey = new DatabaseEntry(); |
| | | this.dbValue = new DatabaseEntry(); |
| | | this.cacheSize = cacheSize; |
| | | } |
| | | |
| | | |
| | | private NavigableSet<IndexInputBuffer> initializeBuffers() |
| | | throws IOException |
| | | |
| | | /** |
| | | * Initializes this task. |
| | | * |
| | | * @throws IOException |
| | | * If an IO error occurred. |
| | | */ |
| | | public void beginWriteTask() throws IOException |
| | | { |
| | | NavigableSet<IndexInputBuffer> bufferSet = |
| | | new TreeSet<IndexInputBuffer>(); |
| | | for (int i = 0; i < indexMgr.bufferIndexCount; i++) |
| | | bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r"); |
| | | bufferIndexFile = new DataInputStream(new BufferedInputStream( |
| | | new FileInputStream(indexMgr.getBufferIndexFile()))); |
| | | |
| | | remainingBuffers = indexMgr.getNumberOfBuffers(); |
| | | totalBatches = (remainingBuffers / maxPermits) + 1; |
| | | batchNumber.set(0); |
| | | nextBufferID = 0; |
| | | ownedPermits = 0; |
| | | |
| | | Message message = NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get( |
| | | indexMgr.getBufferFileName(), remainingBuffers, totalBatches); |
| | | logError(message); |
| | | |
| | | indexMgr.setIndexDBWriteTask(this); |
| | | isRunning = true; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the next batch of buffers to be processed, blocking until enough |
| | | * buffer permits are available. |
| | | * |
| | | * @return The next batch of buffers, or {@code null} if there are no more |
| | | * buffers to be processed. |
| | | * @throws Exception |
| | | * If an exception occurred. |
| | | */ |
| | | public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception |
| | | { |
| | | // First release any previously acquired permits. |
| | | if (ownedPermits > 0) |
| | | { |
| | | IndexInputBuffer b = new IndexInputBuffer(indexMgr, |
| | | indexMgr.bufferIndexBegin[i], indexMgr.bufferIndexEnd[i], |
| | | indexMgr.bufferIndexID[i]); |
| | | b.initializeCache(cacheSize); |
| | | bufferSet.add(b); |
| | | permits.release(ownedPermits); |
| | | ownedPermits = 0; |
| | | } |
| | | |
| | | // GC arrays. |
| | | indexMgr.bufferIndexBegin = null; |
| | | indexMgr.bufferIndexEnd = null; |
| | | indexMgr.bufferIndexID = null; |
| | | // Block until we can either get enough permits for all buffers, or the |
| | | // maximum number of permits. |
| | | final int permitRequest = Math.min(remainingBuffers, maxPermits); |
| | | if (permitRequest == 0) |
| | | { |
| | | // No more work to do. |
| | | return null; |
| | | } |
| | | permits.acquire(permitRequest); |
| | | |
| | | return bufferSet; |
| | | // Update counters. |
| | | ownedPermits = permitRequest; |
| | | remainingBuffers -= permitRequest; |
| | | batchNumber.incrementAndGet(); |
| | | |
| | | // Create all the index buffers for the next batch. |
| | | final NavigableSet<IndexInputBuffer> buffers = |
| | | new TreeSet<IndexInputBuffer>(); |
| | | for (int i = 0; i < permitRequest; i++) |
| | | { |
| | | final long bufferBegin = bufferIndexFile.readLong(); |
| | | final long bufferEnd = bufferIndexFile.readLong(); |
| | | final IndexInputBuffer b = new IndexInputBuffer(indexMgr, |
| | | bufferFile.getChannel(), bufferBegin, bufferEnd, nextBufferID++, |
| | | cacheSize); |
| | | buffers.add(b); |
| | | } |
| | | |
| | | return buffers; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Finishes this task. |
| | | * |
| | | * @throws Exception |
| | | * If an exception occurred. |
| | | */ |
| | | public void endWriteTask() throws Exception |
| | | { |
| | | isRunning = false; |
| | | |
| | | // First release any previously acquired permits. |
| | | if (ownedPermits > 0) |
| | | { |
| | | permits.release(ownedPermits); |
| | | ownedPermits = 0; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | for (DNState dnState : dnStateMap.values()) |
| | | { |
| | | dnState.flush(); |
| | | } |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE |
| | | .get(indexMgr.getDNCount()); |
| | | logError(msg); |
| | | } |
| | | else |
| | | { |
| | | for (Index index : indexMap.values()) |
| | | { |
| | | index.closeCursor(); |
| | | } |
| | | Message message = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr |
| | | .getBufferFileName()); |
| | | logError(message); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if (bufferFile != null) |
| | | { |
| | | try |
| | | { |
| | | bufferFile.close(); |
| | | } |
| | | catch (IOException ignored) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | |
| | | if (bufferIndexFile != null) |
| | | { |
| | | try |
| | | { |
| | | bufferIndexFile.close(); |
| | | } |
| | | catch (IOException ignored) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | |
| | | indexMgr.getBufferFile().delete(); |
| | | indexMgr.getBufferIndexFile().delete(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Print out progress stats. |
| | | * |
| | | * @param deltaTime |
| | | * The time since the last update. |
| | | */ |
| | | public void printStats(long deltaTime) |
| | | { |
| | | if (isRunning) |
| | | { |
| | | final long bufferFileSize = indexMgr.getBufferFileSize(); |
| | | final long tmpBytesRead = bytesRead.get(); |
| | | final int currentBatch = batchNumber.get(); |
| | | |
| | | final long bytesReadInterval = tmpBytesRead - lastBytesRead; |
| | | final int bytesReadPercent = Math.round((100f * tmpBytesRead) |
| | | / bufferFileSize); |
| | | |
| | | // Kilo and milli approximately cancel out. |
| | | final long kiloBytesRate = bytesReadInterval / deltaTime; |
| | | final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024; |
| | | |
| | | Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get( |
| | | indexMgr.getBufferFileName(), bytesReadPercent, kiloBytesRemaining, |
| | | kiloBytesRate, currentBatch, totalBatches); |
| | | logError(message); |
| | | |
| | | lastBytesRead = tmpBytesRead; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | Thread.setDefaultUncaughtExceptionHandler( |
| | | new DefaultExceptionHandler()); |
| | | indexMgr.setStarted(); |
| | | Message message = |
| | | NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(), |
| | | indexMgr.bufferIndexCount); |
| | | logError(message); |
| | | Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler()); |
| | | |
| | | ByteBuffer key = null; |
| | | ImportIDSet insertIDSet = null; |
| | |
| | | |
| | | try |
| | | { |
| | | indexMgr.openIndexFile(); |
| | | NavigableSet<IndexInputBuffer> bufferSet = initializeBuffers(); |
| | | while (!bufferSet.isEmpty()) |
| | | { |
| | | IndexInputBuffer b = bufferSet.pollFirst(); |
| | | if (key == null) |
| | | { |
| | | indexID = b.getIndexID(); |
| | | beginWriteTask(); |
| | | |
| | | if (indexMgr.isDN2ID()) |
| | | NavigableSet<IndexInputBuffer> bufferSet; |
| | | while ((bufferSet = getNextBufferBatch()) != null) |
| | | { |
| | | while (!bufferSet.isEmpty()) |
| | | { |
| | | IndexInputBuffer b = bufferSet.pollFirst(); |
| | | if (key == null) |
| | | { |
| | | insertIDSet = new ImportIDSet(1, 1, false); |
| | | deleteIDSet = new ImportIDSet(1, 1, false); |
| | | indexID = b.getIndexID(); |
| | | |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | insertIDSet = new ImportIDSet(1, 1, false); |
| | | deleteIDSet = new ImportIDSet(1, 1, false); |
| | | } |
| | | else |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | int limit = index.getIndexEntryLimit(); |
| | | boolean doCount = index.getMaintainCount(); |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | } |
| | | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | key.flip(); |
| | | b.getKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | insertIDSet.setKey(key); |
| | | deleteIDSet.setKey(key); |
| | | } |
| | | else if (b.compare(key, indexID) != 0) |
| | | { |
| | | addToDB(insertIDSet, deleteIDSet, indexID); |
| | | keyCount.incrementAndGet(); |
| | | |
| | | indexID = b.getIndexID(); |
| | | |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | insertIDSet = new ImportIDSet(1, 1, false); |
| | | deleteIDSet = new ImportIDSet(1, 1, false); |
| | | } |
| | | else |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | int limit = index.getIndexEntryLimit(); |
| | | boolean doCount = index.getMaintainCount(); |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | } |
| | | |
| | | key.clear(); |
| | | if (b.getKeyLen() > key.capacity()) |
| | | { |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | key.flip(); |
| | | b.getKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | insertIDSet.setKey(key); |
| | | deleteIDSet.setKey(key); |
| | | } |
| | | else |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | int limit = index.getIndexEntryLimit(); |
| | | boolean doCount = index.getMaintainCount(); |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | } |
| | | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | key.flip(); |
| | | b.getKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | insertIDSet.setKey(key); |
| | | deleteIDSet.setKey(key); |
| | | if (b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | bufferSet.add(b); |
| | | } |
| | | } |
| | | else if (b.compare(key, indexID) != 0) |
| | | |
| | | if (key != null) |
| | | { |
| | | addToDB(insertIDSet, deleteIDSet, indexID); |
| | | indexMgr.incrementKeyCount(); |
| | | |
| | | indexID = b.getIndexID(); |
| | | |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | insertIDSet = new ImportIDSet(1, 1, false); |
| | | deleteIDSet = new ImportIDSet(1, 1, false); |
| | | } |
| | | else |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | int limit = index.getIndexEntryLimit(); |
| | | boolean doCount = index.getMaintainCount(); |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | } |
| | | |
| | | key.clear(); |
| | | if (b.getKeyLen() > key.capacity()) |
| | | { |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | key.flip(); |
| | | b.getKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | insertIDSet.setKey(key); |
| | | deleteIDSet.setKey(key); |
| | | } |
| | | else |
| | | { |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | } |
| | | |
| | | if(b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | bufferSet.add(b); |
| | | } |
| | | } |
| | | |
| | | if(key != null) |
| | | { |
| | | addToDB(insertIDSet, deleteIDSet, indexID); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | message = |
| | | ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr.getFileName(), |
| | | e.getMessage()); |
| | | Message message = ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get( |
| | | indexMgr.getBufferFileName(), e.getMessage()); |
| | | logError(message); |
| | | e.printStackTrace(); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | cleanUP(); |
| | | endWriteTask(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | private void cleanUP() throws DatabaseException, DirectoryException, |
| | | IOException |
| | | { |
| | | try |
| | | { |
| | | if(indexMgr.isDN2ID()) |
| | | { |
| | | for(DNState dnState : dnStateMap.values()) |
| | | { |
| | | dnState.flush(); |
| | | } |
| | | Message msg = |
| | | NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount()); |
| | | logError(msg); |
| | | } |
| | | else |
| | | { |
| | | for(Index index : indexMap.values()) |
| | | { |
| | | index.closeCursor(); |
| | | } |
| | | Message message = |
| | | NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getFileName()); |
| | | logError(message); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | indexMgr.setDone(); |
| | | indexMgr.close(); |
| | | indexMgr.deleteIndexFile(); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet, |
| | | int indexID) throws InterruptedException, |
| | | DatabaseException, DirectoryException |
| | |
| | | } |
| | | |
| | | |
| | | /** |
| | | |
| | | private void addBytesRead(int bytesRead) |
| | | { |
| | | this.bytesRead.addAndGet(bytesRead); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * This class is used to by a index DB merge thread performing DN processing |
| | | * to keep track of the state of individual DN2ID index processing. |
| | | */ |
| | |
| | | private final int DRAIN_TO = 3; |
| | | private final IndexManager indexMgr; |
| | | private final BlockingQueue<IndexOutputBuffer> queue; |
| | | private final ByteArrayOutputStream insetByteStream = |
| | | private final ByteArrayOutputStream insertByteStream = |
| | | new ByteArrayOutputStream(2 * bufferSize); |
| | | private final ByteArrayOutputStream deleteByteStream = |
| | | new ByteArrayOutputStream(2 * bufferSize); |
| | | private final DataOutputStream bufferStream; |
| | | private final DataOutputStream bufferIndexStream; |
| | | private final byte[] tmpArray = new byte[8]; |
| | | private int insertKeyCount = 0, deleteKeyCount = 0; |
| | | private final DataOutputStream dataStream; |
| | | private int bufferCount = 0; |
| | | private final File file; |
| | | private final SortedSet<IndexOutputBuffer> indexSortedSet; |
| | | private boolean poisonSeen = false; |
| | | |
| | |
| | | IndexManager indexMgr) throws FileNotFoundException |
| | | { |
| | | this.queue = queue; |
| | | file = indexMgr.getFile(); |
| | | this.indexMgr = indexMgr; |
| | | BufferedOutputStream bufferedStream = |
| | | new BufferedOutputStream(new FileOutputStream(file), |
| | | READER_WRITER_BUFFER_SIZE); |
| | | dataStream = new DataOutputStream(bufferedStream); |
| | | indexSortedSet = new TreeSet<IndexOutputBuffer>(); |
| | | this.bufferStream = new DataOutputStream(new BufferedOutputStream( |
| | | new FileOutputStream(indexMgr.getBufferFile()), |
| | | READER_WRITER_BUFFER_SIZE)); |
| | | this.bufferIndexStream = new DataOutputStream(new BufferedOutputStream( |
| | | new FileOutputStream(indexMgr.getBufferIndexFile()), |
| | | READER_WRITER_BUFFER_SIZE)); |
| | | this.indexSortedSet = new TreeSet<IndexOutputBuffer>(); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | } |
| | | offset += bufferLen; |
| | | indexMgr.addBuffer(beginOffset, offset, bufferCount); |
| | | |
| | | // Write buffer index information. |
| | | bufferIndexStream.writeLong(beginOffset); |
| | | bufferIndexStream.writeLong(offset); |
| | | |
| | | bufferCount++; |
| | | Importer.this.bufferCount.incrementAndGet(); |
| | | |
| | | if(poisonSeen) |
| | | { |
| | | break; |
| | |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | Message message = |
| | | ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get( |
| | | file.getAbsolutePath(), e.getMessage()); |
| | | Message message = ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(indexMgr |
| | | .getBufferFile().getAbsolutePath(), e.getMessage()); |
| | | logError(message); |
| | | isPhaseOneCanceled = true; |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | dataStream.close(); |
| | | indexMgr.setFileLength(); |
| | | bufferStream.close(); |
| | | bufferIndexStream.close(); |
| | | indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length()); |
| | | } |
| | | return null; |
| | | } |
| | |
| | | int numberKeys = indexBuffer.getNumberKeys(); |
| | | indexBuffer.setPosition(-1); |
| | | long bufferLen = 0; |
| | | insetByteStream.reset(); insertKeyCount = 0; |
| | | insertByteStream.reset(); insertKeyCount = 0; |
| | | deleteByteStream.reset(); deleteKeyCount = 0; |
| | | for(int i = 0; i < numberKeys; i++) |
| | | { |
| | |
| | | indexBuffer.setPosition(i); |
| | | if(indexBuffer.isInsert(i)) |
| | | { |
| | | indexBuffer.writeID(insetByteStream, i); |
| | | indexBuffer.writeID(insertByteStream, i); |
| | | insertKeyCount++; |
| | | } |
| | | else |
| | |
| | | { |
| | | bufferLen += writeRecord(indexBuffer); |
| | | indexBuffer.setPosition(i); |
| | | insetByteStream.reset();insertKeyCount = 0; |
| | | insertByteStream.reset();insertKeyCount = 0; |
| | | deleteByteStream.reset();deleteKeyCount = 0; |
| | | } |
| | | if(indexBuffer.isInsert(i)) |
| | | { |
| | | if(insertKeyCount++ <= indexMgr.getLimit()) |
| | | { |
| | | indexBuffer.writeID(insetByteStream, i); |
| | | indexBuffer.writeID(insertByteStream, i); |
| | | } |
| | | } |
| | | else |
| | |
| | | { |
| | | long id = 0; |
| | | long bufferLen = 0; |
| | | insetByteStream.reset(); insertKeyCount = 0; |
| | | insertByteStream.reset(); insertKeyCount = 0; |
| | | deleteByteStream.reset(); deleteKeyCount = 0; |
| | | for(IndexOutputBuffer b : buffers) |
| | | { |
| | |
| | | saveIndexID = b.getIndexID(); |
| | | if(b.isInsert(b.getPosition())) |
| | | { |
| | | b.writeID(insetByteStream, b.getPosition()); |
| | | b.writeID(insertByteStream, b.getPosition()); |
| | | insertKeyCount++; |
| | | } |
| | | else |
| | |
| | | if(!b.compare(saveKey, saveIndexID)) |
| | | { |
| | | bufferLen += writeRecord(saveKey, saveIndexID); |
| | | insetByteStream.reset(); |
| | | insertByteStream.reset(); |
| | | deleteByteStream.reset(); |
| | | insertKeyCount = 0; |
| | | deleteKeyCount = 0; |
| | |
| | | saveIndexID = b.getIndexID(); |
| | | if(b.isInsert(b.getPosition())) |
| | | { |
| | | b.writeID(insetByteStream, b.getPosition()); |
| | | b.writeID(insertByteStream, b.getPosition()); |
| | | insertKeyCount++; |
| | | } |
| | | else |
| | |
| | | { |
| | | if(insertKeyCount++ <= indexMgr.getLimit()) |
| | | { |
| | | b.writeID(insetByteStream, b.getPosition()); |
| | | b.writeID(insertByteStream, b.getPosition()); |
| | | } |
| | | } |
| | | else |
| | |
| | | if(insertKeyCount > indexMgr.getLimit()) |
| | | { |
| | | insertKeyCount = 1; |
| | | insetByteStream.reset(); |
| | | insertByteStream.reset(); |
| | | PackedInteger.writeInt(tmpArray, 0, -1); |
| | | insetByteStream.write(tmpArray, 0, 1); |
| | | insertByteStream.write(tmpArray, 0, 1); |
| | | } |
| | | int insertSize = PackedInteger.getWriteIntLength(insertKeyCount); |
| | | PackedInteger.writeInt(tmpArray, 0, insertKeyCount); |
| | | dataStream.write(tmpArray, 0, insertSize); |
| | | if(insetByteStream.size() > 0) |
| | | bufferStream.write(tmpArray, 0, insertSize); |
| | | if(insertByteStream.size() > 0) |
| | | { |
| | | insetByteStream.writeTo(dataStream); |
| | | insertByteStream.writeTo(bufferStream); |
| | | } |
| | | int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount); |
| | | PackedInteger.writeInt(tmpArray, 0, deleteKeyCount); |
| | | dataStream.write(tmpArray, 0, deleteSize); |
| | | bufferStream.write(tmpArray, 0, deleteSize); |
| | | if(deleteByteStream.size() > 0) |
| | | { |
| | | deleteByteStream.writeTo(dataStream); |
| | | deleteByteStream.writeTo(bufferStream); |
| | | } |
| | | return insertSize + deleteSize; |
| | | } |
| | |
| | | |
| | | private int writeHeader(int indexID, int keySize) throws IOException |
| | | { |
| | | dataStream.writeInt(indexID); |
| | | bufferStream.writeInt(indexID); |
| | | int packedSize = PackedInteger.getWriteIntLength(keySize); |
| | | PackedInteger.writeInt(tmpArray, 0, keySize); |
| | | dataStream.write(tmpArray, 0, packedSize); |
| | | bufferStream.write(tmpArray, 0, packedSize); |
| | | return packedSize; |
| | | } |
| | | |
| | |
| | | { |
| | | int keySize = b.getKeySize(); |
| | | int packedSize = writeHeader(b.getIndexID(), keySize); |
| | | b.writeKey(dataStream); |
| | | b.writeKey(bufferStream); |
| | | packedSize += writeByteStreams(); |
| | | return (packedSize + keySize + insetByteStream.size() + |
| | | return (packedSize + keySize + insertByteStream.size() + |
| | | deleteByteStream.size() + 4); |
| | | } |
| | | |
| | |
| | | private int writeRecord(byte[] k, int indexID) throws IOException |
| | | { |
| | | int packedSize = writeHeader(indexID, k.length); |
| | | dataStream.write(k); |
| | | bufferStream.write(k); |
| | | packedSize += writeByteStreams(); |
| | | return (packedSize + k.length + insetByteStream.size() + |
| | | return (packedSize + k.length + insertByteStream.size() + |
| | | deleteByteStream.size() + 4); |
| | | } |
| | | } |
| | |
| | | */ |
| | | final class IndexManager implements Comparable<IndexManager> |
| | | { |
| | | private static final int BUFFER_SIZE = 128; |
| | | |
| | | private final File file; |
| | | private RandomAccessFile rFile = null; |
| | | private long fileLength, bytesRead = 0; |
| | | private boolean done = false, started = false; |
| | | private final File bufferFile; |
| | | private final String bufferFileName; |
| | | private final File bufferIndexFile; |
| | | private final String bufferIndexFileName; |
| | | private long bufferFileSize; |
| | | private long totalDNS; |
| | | private AtomicInteger keyCount = new AtomicInteger(0); |
| | | private final String fileName; |
| | | private final boolean isDN; |
| | | private final int limit; |
| | | |
| | | private long[] bufferIndexBegin = new long[BUFFER_SIZE]; |
| | | private long[] bufferIndexEnd = new long[BUFFER_SIZE]; |
| | | private int[] bufferIndexID = new int[BUFFER_SIZE]; |
| | | private int bufferIndexCount = 0; |
| | | private int numberOfBuffers = 0; |
| | | private volatile IndexDBWriteTask writer = null; |
| | | |
| | | private IndexManager(String fileName, boolean isDN, int limit) |
| | | { |
| | | file = new File(tempDir, fileName); |
| | | this.fileName = fileName; |
| | | this.bufferFileName = fileName; |
| | | this.bufferIndexFileName = fileName + ".index"; |
| | | |
| | | this.bufferFile = new File(tempDir, bufferFileName); |
| | | this.bufferIndexFile = new File(tempDir, bufferIndexFileName); |
| | | |
| | | this.isDN = isDN; |
| | | this.limit = limit; |
| | | } |
| | | |
| | | |
| | | private void openIndexFile() throws FileNotFoundException |
| | | private void setIndexDBWriteTask(IndexDBWriteTask writer) |
| | | { |
| | | rFile = new RandomAccessFile(file, "r"); |
| | | this.writer = writer; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the file channel associated with this index manager. |
| | | * |
| | | * @return The file channel associated with this index manager. |
| | | */ |
| | | FileChannel getChannel() |
| | | private File getBufferFile() |
| | | { |
| | | return rFile.getChannel(); |
| | | return bufferFile; |
| | | } |
| | | |
| | | |
| | | |
| | | private void addBuffer(long begin, long end, int id) |
| | | private long getBufferFileSize() |
| | | { |
| | | int size = bufferIndexBegin.length; |
| | | if (bufferIndexCount >= size) |
| | | { |
| | | size += BUFFER_SIZE; |
| | | bufferIndexBegin = Arrays.copyOf(bufferIndexBegin, size); |
| | | bufferIndexEnd = Arrays.copyOf(bufferIndexEnd, size); |
| | | bufferIndexID = Arrays.copyOf(bufferIndexID, size); |
| | | } |
| | | bufferIndexBegin[bufferIndexCount] = begin; |
| | | bufferIndexEnd[bufferIndexCount] = end; |
| | | bufferIndexID[bufferIndexCount] = id; |
| | | bufferIndexCount++; |
| | | return bufferFileSize; |
| | | } |
| | | |
| | | |
| | | |
| | | private File getFile() |
| | | private File getBufferIndexFile() |
| | | { |
| | | return file; |
| | | return bufferIndexFile; |
| | | } |
| | | |
| | | |
| | | private boolean deleteIndexFile() |
| | | private void setBufferInfo(int numberOfBuffers, long bufferFileSize) |
| | | { |
| | | return file.delete(); |
| | | } |
| | | |
| | | |
| | | private void close() throws IOException |
| | | { |
| | | rFile.close(); |
| | | } |
| | | |
| | | |
| | | private void setFileLength() |
| | | { |
| | | this.fileLength = file.length(); |
| | | this.numberOfBuffers = numberOfBuffers; |
| | | this.bufferFileSize = bufferFileSize; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | void addBytesRead(int bytesRead) |
| | | { |
| | | this.bytesRead += bytesRead; |
| | | } |
| | | |
| | | |
| | | private void setDone() |
| | | { |
| | | this.done = true; |
| | | } |
| | | |
| | | |
| | | private void setStarted() |
| | | { |
| | | started = true; |
| | | if (writer != null) |
| | | { |
| | | writer.addBytesRead(bytesRead); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void addTotDNCount(int delta) |
| | | { |
| | | this.totalDNS += delta; |
| | | totalDNS += delta; |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | private void printStats(long deltaTime) |
| | | { |
| | | if(!done && started) |
| | | if (writer != null) |
| | | { |
| | | float rate = 1000f * keyCount.getAndSet(0) / deltaTime; |
| | | Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(fileName, |
| | | (fileLength - bytesRead), rate); |
| | | logError(message); |
| | | writer.printStats(deltaTime); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void incrementKeyCount() |
| | | { |
| | | keyCount.incrementAndGet(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the file name associated with this index manager. |
| | | * |
| | | * @return The file name associated with this index manager. |
| | | */ |
| | | String getFileName() |
| | | String getBufferFileName() |
| | | { |
| | | return fileName; |
| | | return bufferFileName; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public int compareTo(IndexManager mgr) |
| | | { |
| | | return bufferIndexCount - mgr.bufferIndexCount; |
| | | return numberOfBuffers - mgr.numberOfBuffers; |
| | | } |
| | | |
| | | |
| | | |
| | | private int getNumberOfBuffers() |
| | | { |
| | | return numberOfBuffers; |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | private long latestCount; |
| | | |
| | | /** |
| | | |
| | | |
| | | /** |
| | | * Create a new import progress task. |
| | | * |
| | | * @param latestCount The latest count of entries processed in phase one. |
| | | * @param latestCount |
| | | * The latest count of entries processed in phase one. |
| | | */ |
| | | public SecondPhaseProgressTask (long latestCount) |
| | | public SecondPhaseProgressTask(long latestCount) |
| | | { |
| | | previousTime = System.currentTimeMillis(); |
| | | this.latestCount = latestCount; |
| | | try |
| | | { |
| | | previousStats = |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | previousStats = rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |