| | |
| | | * Shim that allows properly constructing an {@link Importer} without polluting |
| | | * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | static final class StrategyImpl implements ImportStrategy |
| | | { |
| | | private final PluggableBackendCfg backendCfg; |
| | | |
| | | public StrategyImpl(PluggableBackendCfg backendCfg) |
| | | StrategyImpl(PluggableBackendCfg backendCfg) |
| | | { |
| | | this.backendCfg = backendCfg; |
| | | } |
| | |
| | | */ |
| | | private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap = new ConcurrentHashMap<>(); |
| | | |
| | | /** Map of DB containers to index managers. Used to start phase 2. */ |
| | | /** The index managers used to start phase 2. */ |
| | | private final List<IndexManager> indexMgrList = new LinkedList<>(); |
| | | /** Map of DB containers to DN-based index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> DNIndexMgrList = new LinkedList<>(); |
| | | |
| | | /** |
| | | * Futures used to indicate when the index file writers are done flushing |
| | |
| | | /** Number of phase one buffers. */ |
| | | private int phaseOneBufferCount; |
| | | |
| | | /** |
| | | * Create a new import job with the specified rebuild index config. |
| | | * |
| | | * @param rebuildConfig |
| | | * The rebuild index configuration. |
| | | * @param cfg |
| | | * The backend configuration. |
| | | * @throws InitializationException |
| | | * If a problem occurs during initialization. |
| | | * @throws StorageRuntimeException |
| | | * If an error occurred when opening the DB. |
| | | * @throws ConfigException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | Importer(RootContainer rootContainer, RebuildConfig rebuildConfig, PluggableBackendCfg cfg, |
| | | ServerContext serverContext) throws InitializationException, StorageRuntimeException, ConfigException |
| | | { |
| | |
| | | this.dnCache = null; |
| | | } |
| | | |
| | | /** |
| | | * Create a new import job with the specified ldif import config. |
| | | * |
| | | * @param importCfg |
| | | * The LDIF import configuration. |
| | | * @param backendCfg |
| | | * The backend configuration. |
| | | * @throws InitializationException |
| | | * If a problem occurs during initialization. |
| | | * @throws ConfigException |
| | | * If a problem occurs reading the configuration. |
| | | * @throws StorageRuntimeException |
| | | * If an error occurred when opening the DB. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | Importer(RootContainer rootContainer, LDIFImportConfig importCfg, PluggableBackendCfg backendCfg, |
| | | ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException |
| | | { |
| | |
| | | int buffers; |
| | | while (true) |
| | | { |
| | | final List<IndexManager> allIndexMgrs = new ArrayList<>(DNIndexMgrList); |
| | | allIndexMgrs.addAll(indexMgrList); |
| | | final List<IndexManager> allIndexMgrs = new ArrayList<>(indexMgrList); |
| | | Collections.sort(allIndexMgrs, Collections.reverseOrder()); |
| | | |
| | | buffers = 0; |
| | |
| | | storage.close(); |
| | | try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport()) |
| | | { |
| | | List<Future<Void>> futures = new LinkedList<>(); |
| | | submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures); |
| | | submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures); |
| | | getAll(futures); |
| | | submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize); |
| | | } |
| | | finally |
| | | { |
| | |
| | | |
| | | private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, |
| | | org.opends.server.backends.pluggable.spi.Importer importer, |
| | | ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures) |
| | | ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize) throws InterruptedException |
| | | { |
| | | List<IndexDBWriteTask> tasks = new ArrayList<IndexDBWriteTask>(indexMgrs.size()); |
| | | for (IndexManager indexMgr : indexMgrs) |
| | | { |
| | | futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize))); |
| | | tasks.add(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)); |
| | | } |
| | | dbService.invokeAll(tasks); |
| | | } |
| | | |
| | | private static <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException |
| | |
| | | * |
| | | * @return true if the import operation can proceed with the provided entry, false otherwise |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | boolean dnSanityCheck(WriteableTransaction txn, Entry entry, Suffix suffix) |
| | | throws StorageRuntimeException, InterruptedException |
| | | { |
| | |
| | | } |
| | | |
| | | /** Why do we still need this if we are checking parents in the first phase? */ |
| | | @SuppressWarnings("javadoc") |
| | | boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet) |
| | | throws StorageRuntimeException |
| | | { |
| | |
| | | } |
| | | boolean isDN2ID = DN2ID_INDEX_NAME.equals(indexKey.getIndexID()); |
| | | IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN2ID, indexKey.getEntryLimit()); |
| | | if (isDN2ID) |
| | | { |
| | | DNIndexMgrList.add(indexMgr); |
| | | } |
| | | else |
| | | { |
| | | indexMgrList.add(indexMgr); |
| | | } |
| | | indexMgrList.add(indexMgr); |
| | | BlockingQueue<IndexOutputBuffer> newQueue = new ArrayBlockingQueue<>(phaseOneBufferCount); |
| | | ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr); |
| | | scratchFileWriterList.add(indexWriter); |
| | |
| | | |
| | | previousTime = latestTime; |
| | | |
| | | //Do DN index managers first. |
| | | for (IndexManager indexMgrDN : DNIndexMgrList) |
| | | { |
| | | indexMgrDN.printStats(deltaTime); |
| | | } |
| | | //Do non-DN index managers. |
| | | // DN index managers first. |
| | | printStats(deltaTime, true); |
| | | // non-DN index managers second |
| | | printStats(deltaTime, false); |
| | | } |
| | | |
| | | private void printStats(long deltaTime, boolean dn2id) |
| | | { |
| | | for (IndexManager indexMgr : indexMgrList) |
| | | { |
| | | indexMgr.printStats(deltaTime); |
| | | if (dn2id == indexMgr.isDN2ID()) |
| | | { |
| | | indexMgr.printStats(deltaTime); |
| | | } |
| | | } |
| | | } |
| | | } |