| | |
| | | { |
| | | this.threadCount = importConfiguration.getThreadCount(); |
| | | } |
| | | this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads |
| | | |
| | | // Determine the number of indexes. |
| | | this.indexCount = getTotalIndexCount(backendCfg); |
| | |
| | | { |
| | | return !importCfg.appendToExistingData() |
| | | && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1); |
| | | /* |
| | | * Why do we clear when there is only one baseDN? |
| | | * any baseDN for which data is imported will be cleared anyway (see getSuffix()), |
| | | * so if there is only one baseDN for this backend, then clear it now. |
| | | */ |
| | | } |
| | | |
| | | private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) |
| | |
| | | * |
| | | * @param rootContainer |
| | | * The root container to use during the import. |
| | | * @param txn |
| | | * The database transaction |
| | | * @return A LDIF result. |
| | | * @throws Exception |
| | | * If the import failed |
| | | */ |
| | | public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception |
| | | public LDIFImportResult processImport(RootContainer rootContainer) throws Exception |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | try { |
| | |
| | | logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(), |
| | | BUILD_ID, REVISION_NUMBER); |
| | | logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount); |
| | | initializeSuffixes(txn); |
| | | setIndexesTrusted(txn, false); |
| | | |
| | | final Storage storage = rootContainer.getStorage(); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableStorage txn) throws Exception |
| | | { |
| | | initializeSuffixes(txn); |
| | | setIndexesTrusted(txn, false); |
| | | } |
| | | }); |
| | | |
| | | final long startTime = System.currentTimeMillis(); |
| | | importPhaseOne(txn); |
| | | importPhaseOne(); |
| | | isPhaseOneDone = true; |
| | | final long phaseOneFinishTime = System.currentTimeMillis(); |
| | | |
| | |
| | | } |
| | | final long phaseTwoFinishTime = System.currentTimeMillis(); |
| | | |
| | | setIndexesTrusted(txn, true); |
| | | switchEntryContainers(txn); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableStorage txn) throws Exception |
| | | { |
| | | setIndexesTrusted(txn, true); |
| | | switchEntryContainers(txn); |
| | | } |
| | | }); |
| | | recursiveDelete(tempDir); |
| | | final long finishTime = System.currentTimeMillis(); |
| | | final long importTime = finishTime - startTime; |
| | |
| | | * The scratch files will be read by phaseTwo to perform on-disk merge</li> |
| | | * </ol> |
| | | */ |
| | | private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException |
| | | private void importPhaseOne() throws Exception |
| | | { |
| | | initializeIndexBuffers(); |
| | | |
| | |
| | | bufferSortService = Executors.newFixedThreadPool(threadCount); |
| | | final ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | execService.submit(new MigrateExistingTask(txn)).get(); |
| | | final Storage storage = rootContainer.getStorage(); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableStorage txn) throws Exception |
| | | { |
| | | execService.submit(new MigrateExistingTask(txn)).get(); |
| | | } |
| | | }); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | if (importConfiguration.appendToExistingData() |
| | |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new AppendReplaceTask(txn)); |
| | | tasks.add(new AppendReplaceTask(storage.getWriteableStorage())); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new ImportTask(txn)); |
| | | tasks.add(new ImportTask(storage.getWriteableStorage())); |
| | | } |
| | | } |
| | | getAll(execService.invokeAll(tasks)); |
| | | tasks.clear(); |
| | | |
| | | execService.submit(new MigrateExcludedTask(txn)).get(); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableStorage txn) throws Exception |
| | | { |
| | | execService.submit(new MigrateExcludedTask(txn)).get(); |
| | | } |
| | | }); |
| | | |
| | | stopScratchFileWriters(); |
| | | getAll(scratchFileWriterFutures); |
| | |
| | | isCanceled = true; |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | txn.close(); |
| | | } |
| | | } |
| | | |
| | | void processEntry(Entry entry, Suffix suffix) |
| | |
| | | isCanceled = true; |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | txn.close(); |
| | | } |
| | | } |
| | | |
| | | void processEntry(Entry entry, EntryID entryID, Suffix suffix) |
| | |
| | | { |
| | | private final IndexManager indexMgr; |
| | | private final int cacheSize; |
| | | /** indexID => DNState map */ |
| | | private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>(); |
| | | private final Semaphore permits; |
| | | private final int maxPermits; |
| | |
| | | |
| | | private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException |
| | | { |
| | | DNState dnState; |
| | | if (!dnStateMap.containsKey(indexID)) |
| | | DNState dnState = dnStateMap.get(indexID); |
| | | if (dnState == null) |
| | | { |
| | | dnState = new DNState(indexIDToECMap.get(indexID)); |
| | | dnStateMap.put(indexID, dnState); |
| | | } |
| | | else |
| | | { |
| | | dnState = dnStateMap.get(indexID); |
| | | } |
| | | if (dnState.checkParent(txn, idSet)) |
| | | { |
| | | dnState.writeToDN2ID(idSet); |
| | |
| | | @Override |
| | | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable |
| | | { |
| | | final Object returnValue = returnValues.get(method.getName()); |
| | | final String methodName = method.getName(); |
| | | if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener")) |
| | | { |
| | | // ignore calls to (add|remove)*ChangeListener() methods |
| | | return null; |
| | | } |
| | | |
| | | final Object returnValue = returnValues.get(methodName); |
| | | if (returnValue != null) |
| | | { |
| | | return returnValue; |
| | | } |
| | | throw new IllegalArgumentException("Unhandled method call on object (" + proxy |
| | | throw new IllegalArgumentException("Unhandled method call on proxy (" |
| | | + BackendCfgHandler.class.getSimpleName() |
| | | + ") for method (" + method |
| | | + ") with arguments (" + Arrays.toString(args) + ")"); |
| | | } |
| | |
| | | */ |
| | | private final class TmpEnv implements DNCache |
| | | { |
| | | private final Storage storage; |
| | | private WriteableStorage txn; |
| | | private org.opends.server.backends.pluggable.spi.Importer importer; |
| | | private static final String DB_NAME = "dn_cache"; |
| | | private final TreeName dnCache = new TreeName("", DB_NAME); |
| | | private final Storage storage; |
| | | private final WriteableStorage txn; |
| | | |
| | | /** |
| | | * Create a temporary DB environment and database to be used as a cache of |
| | |
| | | final Map<String, Object> returnValues = new HashMap<String, Object>(); |
| | | returnValues.put("getDBDirectory", envPath.getAbsolutePath()); |
| | | returnValues.put("getBackendId", DB_NAME); |
| | | // returnValues.put("getDBCacheSize", 10L); |
| | | returnValues.put("getDBCacheSize", 0L); |
| | | returnValues.put("getDBCachePercent", 10); |
| | | returnValues.put("isDBTxnNoSync", true); |
| | | returnValues.put("getDBDirectoryPermissions", "700"); |
| | | returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB)); |
| | | returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB)); |
| | | try |
| | | { |
| | | returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config")); |
| | | storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues), |
| | | DirectoryServer.getInstance().getServerContext()); |
| | | storage.open(); |
| | | txn = storage.getWriteableStorage(); |
| | | txn.openTree(dnCache); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | |
| | | importer.createTree(dnCache); |
| | | } |
| | | |
| | | private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues) |
| | |
| | | { |
| | | try |
| | | { |
| | | importer.close(); |
| | | storage.close(); |
| | | } |
| | | finally |
| | | { |