| | |
| | | } |
| | | } |
| | | |
| | | private void importPhaseTwo() throws InterruptedException, ExecutionException |
| | | private void importPhaseTwo() throws Exception |
| | | { |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); |
| | | scheduleAtFixedRate(timerService, new SecondPhaseProgressTask()); |
| | |
| | | * Performs on-disk merge by reading several scratch files at once |
| | | * and write their ordered content into the target indexes. |
| | | */ |
| | | private void processIndexFiles() throws InterruptedException, ExecutionException |
| | | private void processIndexFiles() throws Exception |
| | | { |
| | | if (bufferCount.get() == 0) |
| | | { |
| | |
| | | Semaphore permits = new Semaphore(buffers); |
| | | |
| | | // Start DN processing first. |
| | | List<Future<Void>> futures = new LinkedList<>(); |
| | | submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures); |
| | | submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures); |
| | | getAll(futures); |
| | | Storage storage = rootContainer.getStorage(); |
| | | storage.close(); |
| | | try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport()) |
| | | { |
| | | List<Future<Void>> futures = new LinkedList<>(); |
| | | submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures); |
| | | submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures); |
| | | getAll(futures); |
| | | } |
| | | finally |
| | | { |
| | | storage.open(); |
| | | } |
| | | |
| | | shutdownAll(dbService); |
| | | } |
| | | |
| | | private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, |
| | | Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures) |
| | | private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, |
| | | org.opends.server.backends.pluggable.spi.Importer importer, |
| | | ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures) |
| | | { |
| | | for (IndexManager indexMgr : indexMgrs) |
| | | { |
| | | futures.add(dbService.submit( |
| | | new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize))); |
| | | futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize))); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private final class IndexDBWriteTask implements Callable<Void> |
| | | { |
| | | private final Storage storage; |
| | | private final org.opends.server.backends.pluggable.spi.Importer importer; |
| | | private final IndexManager indexMgr; |
| | | private final int cacheSize; |
| | | /** indexID => DNState map */ |
| | |
| | | /** |
| | | * Creates a new index DB writer. |
| | | * |
| | | * @param importer |
| | | * The importer |
| | | * @param indexMgr |
| | | * The index manager. |
| | | * @param storage |
| | | * Where to store data |
| | | * @param permits |
| | | * The semaphore used for restricting the number of buffer allocations. |
| | | * @param maxPermits |
| | |
| | | * @param cacheSize |
| | | * The buffer cache size. |
| | | */ |
| | | public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize) |
| | | public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr, |
| | | Semaphore permits, int maxPermits, int cacheSize) |
| | | { |
| | | this.storage = storage; |
| | | this.importer = importer; |
| | | this.indexMgr = indexMgr; |
| | | this.permits = permits; |
| | | this.maxPermits = maxPermits; |
| | |
| | | } |
| | | |
| | | /** Finishes this task. */ |
| | | private void endWriteTask(WriteableTransaction txn) |
| | | private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | isRunning = false; |
| | | |
| | |
| | | { |
| | | for (DNState dnState : dnStateMap.values()) |
| | | { |
| | | dnState.flush(txn); |
| | | dnState.finalFlush(importer); |
| | | } |
| | | |
| | | if (!isCanceled) |
| | | { |
| | | logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount()); |
| | |
| | | @Override |
| | | public Void call() throws Exception |
| | | { |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | call0(txn); |
| | | } |
| | | }); |
| | | call0(importer); |
| | | return null; |
| | | } |
| | | |
| | | private void call0(WriteableTransaction txn) throws Exception |
| | | private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception |
| | | { |
| | | if (isCanceled) |
| | | { |
| | |
| | | { |
| | | if (previousRecord != null) |
| | | { |
| | | addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | } |
| | | |
| | | // this is a new record |
| | |
| | | |
| | | if (previousRecord != null) |
| | | { |
| | | addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | finally |
| | | { |
| | | endWriteTask(txn); |
| | | endWriteTask(importer); |
| | | } |
| | | } |
| | | |
| | |
| | | return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit()); |
| | | } |
| | | |
| | | private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) |
| | | throws DirectoryException |
| | | private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet, |
| | | ImportIDSet deleteSet) throws DirectoryException |
| | | { |
| | | keyCount.incrementAndGet(); |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | addDN2ID(txn, indexID, insertSet); |
| | | addDN2ID(importer, indexID, insertSet); |
| | | } |
| | | else |
| | | { |
| | | if (!deleteSet.isDefined() || deleteSet.size() > 0) |
| | | { |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | index.importRemove(txn, deleteSet); |
| | | index.importRemove(importer, deleteSet); |
| | | } |
| | | if (!insertSet.isDefined() || insertSet.size() > 0) |
| | | { |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | index.importPut(txn, insertSet); |
| | | index.importPut(importer, insertSet); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException |
| | | private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet) |
| | | throws DirectoryException |
| | | { |
| | | DNState dnState = dnStateMap.get(indexID); |
| | | if (dnState == null) |
| | |
| | | dnState = new DNState(indexIDToECMap.get(indexID)); |
| | | dnStateMap.put(indexID, dnState); |
| | | } |
| | | if (dnState.checkParent(txn, idSet)) |
| | | if (dnState.checkParent(importer, idSet)) |
| | | { |
| | | dnState.writeToDN2ID(txn, idSet.getKey()); |
| | | dnState.writeToDN2ID(importer, idSet.getKey()); |
| | | } |
| | | } |
| | | |
| | |
| | | * This class is used to by a index DB merge thread performing DN processing |
| | | * to keep track of the state of individual DN2ID index processing. |
| | | */ |
| | | final class DNState |
| | | private final class DNState |
| | | { |
| | | private static final int DN_STATE_CACHE_SIZE = 64 * KB; |
| | | |
| | |
| | | private ByteSequence parentDN; |
| | | private final ByteStringBuilder lastDN = new ByteStringBuilder(); |
| | | private EntryID parentID, lastID, entryID; |
| | | private long totalNbEntries; |
| | | |
| | | DNState(EntryContainer entryContainer) |
| | | private DNState(EntryContainer entryContainer) |
| | | { |
| | | this.entryContainer = entryContainer; |
| | | dn2id = entryContainer.getDN2ID().getName(); |
| | |
| | | } |
| | | |
| | | /** Why do we still need this if we are checking parents in the first phase? */ |
| | | private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException |
| | | boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet) |
| | | throws StorageRuntimeException |
| | | { |
| | | entryID = idSet.iterator().next(); |
| | | parentDN = getParent(idSet.getKey()); |
| | |
| | | // If null is returned then this is a suffix DN. |
| | | if (parentDN != null) |
| | | { |
| | | parentID = get(txn, dn2id, parentDN); |
| | | parentID = get(importer, dn2id, parentDN); |
| | | if (parentID == null) |
| | | { |
| | | // We have a missing parent. Maybe parent checking was turned off? |
| | |
| | | return importCfg != null && importCfg.appendToExistingData(); |
| | | } |
| | | |
| | | EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException |
| | | private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn) |
| | | throws StorageRuntimeException |
| | | { |
| | | ByteString value = txn.read(dn2id, dn); |
| | | ByteString value = importer.read(dn2id, dn); |
| | | return value != null ? new EntryID(value) : null; |
| | | } |
| | | |
| | | public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException |
| | | void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key) |
| | | throws DirectoryException |
| | | { |
| | | txn.put(dn2id, key, entryID.toByteString()); |
| | | importer.put(dn2id, key, entryID.toByteString()); |
| | | indexMgr.addTotDNCount(1); |
| | | if (parentID != null) |
| | | { |
| | | incrementChildrenCounter(txn); |
| | | incrementChildrenCounter(importer); |
| | | } |
| | | } |
| | | |
| | | private void incrementChildrenCounter(WriteableTransaction txn) |
| | | private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | final AtomicLong counter = getId2childrenCounter(); |
| | | counter.incrementAndGet(); |
| | | if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE) |
| | | { |
| | | flush(txn); |
| | | flush(importer); |
| | | } |
| | | } |
| | | |
| | | private void flush(WriteableTransaction txn) |
| | | private void flush(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet()) |
| | | { |
| | | entryContainer.getID2ChildrenCount() |
| | | .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get()); |
| | | final EntryID entryID = childrenCounter.getKey(); |
| | | final long totalForEntryID = childrenCounter.getValue().get(); |
| | | totalNbEntries += totalForEntryID; |
| | | entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID); |
| | | } |
| | | id2childrenCountTree.clear(); |
| | | } |
| | | |
| | | |
| | | void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | flush(importer); |
| | | |
| | | entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | indexKeyQueueMap.clear(); |
| | | } |
| | | |
| | | private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException |
| | | private void rebuildIndexesPhaseTwo() throws Exception |
| | | { |
| | | final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask()); |
| | | try |