| | |
| | | envConfig.setConfigParam("je.log.totalBufferBytes", "30000000"); |
| | | envConfig.setConfigParam("je.log.fileMax", "100000000"); |
| | | */ |
| | | rootContainer = new RootContainer(config, backend); |
| | | if (ldifImportConfig.appendToExistingData()) |
| | | |
| | | // Create an LDIF reader. Throws an exception if the file does not exist. |
| | | reader = new LDIFReader(ldifImportConfig); |
| | | |
| | | int msgID; |
| | | String message; |
| | | long startTime; |
| | | try |
| | | { |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, true, true, true, false); |
| | | } |
| | | else |
| | | { |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, false, false, false, false); |
| | | } |
| | | rootContainer = new RootContainer(config, backend); |
| | | if (ldifImportConfig.appendToExistingData()) |
| | | { |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, true, true, true, false); |
| | | } |
| | | else |
| | | { |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, false, false, false, false); |
| | | } |
| | | |
| | | if (!ldifImportConfig.appendToExistingData()) |
| | | { |
| | | // We have the writer lock on the environment, now delete the |
| | | // environment and re-open it. Only do this when we are |
| | | // importing to all the base DNs in the backend. |
| | | rootContainer.close(); |
| | | EnvManager.removeFiles(config.getBackendDirectory().getPath()); |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, false, false, false, false); |
| | | } |
| | | if (!ldifImportConfig.appendToExistingData()) |
| | | { |
| | | // We have the writer lock on the environment, now delete the |
| | | // environment and re-open it. Only do this when we are |
| | | // importing to all the base DNs in the backend. |
| | | rootContainer.close(); |
| | | EnvManager.removeFiles(config.getBackendDirectory().getPath()); |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, false, false, false, false); |
| | | } |
| | | |
| | | // Divide the total buffer size by the number of threads |
| | | // and give that much to each thread. |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | long bufferSize = config.getImportBufferSize() / |
| | | (importThreadCount*config.getBaseDNs().length); |
| | | // Divide the total buffer size by the number of threads |
| | | // and give that much to each thread. |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | long bufferSize = config.getImportBufferSize() / |
| | | (importThreadCount*config.getBaseDNs().length); |
| | | |
| | | int msgID = MSGID_JEB_IMPORT_THREAD_COUNT; |
| | | String message = getMessage(msgID, importThreadCount); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | msgID = MSGID_JEB_IMPORT_THREAD_COUNT; |
| | | message = getMessage(msgID, importThreadCount); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | msgID = MSGID_JEB_IMPORT_BUFFER_SIZE; |
| | | message = getMessage(msgID, bufferSize); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | msgID = MSGID_JEB_IMPORT_BUFFER_SIZE; |
| | | message = getMessage(msgID, bufferSize); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG; |
| | | message = getMessage(msgID, |
| | | rootContainer.getEnvironmentConfig().toString()); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG; |
| | | message = getMessage(msgID, |
| | | rootContainer.getEnvironmentConfig().toString()); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | |
| | | DebugLogger.debugInfo( |
| | | DebugLogger.debugInfo( |
| | | rootContainer.getEnvironmentConfig().toString()); |
| | | |
| | | |
| | | rootContainer.openEntryContainers(config.getBaseDNs()); |
| | | rootContainer.openEntryContainers(config.getBaseDNs()); |
| | | |
| | | // Create the import contexts for each base DN. |
| | | DN baseDN; |
| | | // Create the import contexts for each base DN. |
| | | DN baseDN; |
| | | |
| | | for (EntryContainer entryContainer : rootContainer.getEntryContainers()) |
| | | { |
| | | baseDN = entryContainer.getBaseDN(); |
| | | |
| | | // Create an import context. |
| | | ImportContext importContext = new ImportContext(); |
| | | importContext.setBufferSize(bufferSize); |
| | | importContext.setConfig(config); |
| | | importContext.setLDIFImportConfig(this.ldifImportConfig); |
| | | |
| | | importContext.setBaseDN(baseDN); |
| | | importContext.setContainerName(entryContainer.getContainerName()); |
| | | importContext.setEntryContainer(entryContainer); |
| | | importContext.setBufferSize(bufferSize); |
| | | |
| | | // Create an entry queue. |
| | | LinkedBlockingQueue<Entry> queue = |
| | | new LinkedBlockingQueue<Entry>(config.getImportQueueSize()); |
| | | importContext.setQueue(queue); |
| | | |
| | | importMap.put(baseDN, importContext); |
| | | } |
| | | |
| | | // Make a note of the time we started. |
| | | long startTime = System.currentTimeMillis(); |
| | | |
| | | try |
| | | { |
| | | // Create a temporary work directory. |
| | | File tempDir = new File(config.getImportTempDirectory()); |
| | | tempDir.mkdir(); |
| | | if (tempDir.listFiles() != null) |
| | | for (EntryContainer entryContainer : rootContainer.getEntryContainers()) |
| | | { |
| | | for (File f : tempDir.listFiles()) |
| | | { |
| | | f.delete(); |
| | | } |
| | | baseDN = entryContainer.getBaseDN(); |
| | | |
| | | // Create an import context. |
| | | ImportContext importContext = new ImportContext(); |
| | | importContext.setBufferSize(bufferSize); |
| | | importContext.setConfig(config); |
| | | importContext.setLDIFImportConfig(this.ldifImportConfig); |
| | | importContext.setLDIFReader(reader); |
| | | |
| | | importContext.setBaseDN(baseDN); |
| | | importContext.setContainerName(entryContainer.getContainerName()); |
| | | importContext.setEntryContainer(entryContainer); |
| | | importContext.setBufferSize(bufferSize); |
| | | |
| | | // Create an entry queue. |
| | | LinkedBlockingQueue<Entry> queue = |
| | | new LinkedBlockingQueue<Entry>(config.getImportQueueSize()); |
| | | importContext.setQueue(queue); |
| | | |
| | | importMap.put(baseDN, importContext); |
| | | } |
| | | |
| | | // Make a note of the time we started. |
| | | startTime = System.currentTimeMillis(); |
| | | |
| | | try |
| | | { |
| | | importedCount = 0; |
| | | int passNumber = 1; |
| | | boolean moreData = true; |
| | | while (moreData) |
| | | // Create a temporary work directory. |
| | | File tempDir = new File(config.getImportTempDirectory()); |
| | | tempDir.mkdir(); |
| | | if (tempDir.listFiles() != null) |
| | | { |
| | | moreData = processLDIF(); |
| | | if (moreData) |
| | | for (File f : tempDir.listFiles()) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE; |
| | | message = getMessage(msgID, passNumber++); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | f.delete(); |
| | | } |
| | | else |
| | | } |
| | | |
| | | try |
| | | { |
| | | importedCount = 0; |
| | | int passNumber = 1; |
| | | boolean moreData = true; |
| | | while (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | moreData = processLDIF(); |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE; |
| | | message = getMessage(msgID, passNumber++); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | |
| | | |
| | | long mergeStartTime = System.currentTimeMillis(); |
| | | merge(); |
| | | long mergeEndTime = System.currentTimeMillis(); |
| | | long mergeStartTime = System.currentTimeMillis(); |
| | | merge(); |
| | | long mergeEndTime = System.currentTimeMillis(); |
| | | |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | tempDir.delete(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | tempDir.delete(); |
| | | rootContainer.close(); |
| | | |
| | | // Sync the environment to disk. |
| | | msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | rootContainer.close(); |
| | | |
| | | // Sync the environment to disk. |
| | | msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | reader.close(); |
| | | } |
| | | |
| | | long finishTime = System.currentTimeMillis(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Open a reader for the LDIF file and create a set of worker threads, one |
| | | * set for each base DN. Read each entry from the LDIF and determine which |
| | | * Create a set of worker threads, one set for each base DN. |
| | | * Read each entry from the LDIF and determine which |
| | | * base DN the entry belongs to. Write the dn2id database, then put the |
| | | * entry on the appropriate queue for the worker threads to consume. |
| | | * Record the entry count for each base DN when all entries have been |
| | | * processed. |
| | | * |
| | | * @return true if is more data to be read from the LDIF file (the import |
| | | * @return true if thre is more data to be read from the LDIF file (the import |
| | | * pass size was reached), false if the entire LDIF file has been read. |
| | | * |
| | | * @throws JebException If an error occurs in the JE backend. |
| | |
| | | { |
| | | boolean moreData = false; |
| | | |
| | | // Create an LDIF reader if necessary. |
| | | if (reader == null) |
| | | ArrayList<ImportThread> threads; |
| | | |
| | | // Create one set of worker threads for each base DN. |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | threads = new ArrayList<ImportThread>(importThreadCount*importMap.size()); |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | reader = new LDIFReader(ldifImportConfig); |
| | | for (ImportContext ic : importMap.values()) |
| | | for (int i = 0; i < importThreadCount; i++) |
| | | { |
| | | ic.setLDIFReader(reader); |
| | | ImportThread t = new ImportThread(ic, i); |
| | | t.setUncaughtExceptionHandler(this); |
| | | threads.add(t); |
| | | |
| | | t.start(); |
| | | } |
| | | } |
| | | |
| | | ArrayList<ImportThread> threads; |
| | | try |
| | | { |
| | | // Create one set of worker threads for each base DN. |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | threads = new ArrayList<ImportThread>(importThreadCount*importMap.size()); |
| | | for (ImportContext ic : importMap.values()) |
| | | // Create a counter to use to determine whether we've hit the import |
| | | // pass size. |
| | | int entriesProcessed = 0; |
| | | int importPassSize = config.getImportPassSize(); |
| | | if (importPassSize <= 0) |
| | | { |
| | | for (int i = 0; i < importThreadCount; i++) |
| | | { |
| | | ImportThread t = new ImportThread(ic, i); |
| | | t.setUncaughtExceptionHandler(this); |
| | | threads.add(t); |
| | | |
| | | t.start(); |
| | | } |
| | | importPassSize = Integer.MAX_VALUE; |
| | | } |
| | | |
| | | // Start a timer for the progress report. |
| | | Timer timer = new Timer(); |
| | | TimerTask progressTask = new ImportJob.ProgressTask(); |
| | | timer.scheduleAtFixedRate(progressTask, progressInterval, |
| | | progressInterval); |
| | | |
| | | try |
| | | { |
| | | // Create a counter to use to determine whether we've hit the import |
| | | // pass size. |
| | | int entriesProcessed = 0; |
| | | int importPassSize = config.getImportPassSize(); |
| | | if (importPassSize <= 0) |
| | | { |
| | | importPassSize = Integer.MAX_VALUE; |
| | | } |
| | | |
| | | // Start a timer for the progress report. |
| | | Timer timer = new Timer(); |
| | | TimerTask progressTask = new ImportJob.ProgressTask(); |
| | | timer.scheduleAtFixedRate(progressTask, progressInterval, |
| | | progressInterval); |
| | | |
| | | try |
| | | { |
| | | do |
| | | { |
| | | try |
| | | { |
| | | // Read the next entry. |
| | | Entry entry = reader.readEntry(); |
| | | |
| | | // Check for end of file. |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | // Route it according to base DN. |
| | | ImportContext importContext = getImportConfig(entry.getDN()); |
| | | |
| | | processEntry(importContext, entry); |
| | | |
| | | entriesProcessed++; |
| | | if (entriesProcessed >= importPassSize) |
| | | { |
| | | moreData = true; |
| | | break; |
| | | } |
| | | } |
| | | catch (LDIFException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } while (true); |
| | | |
| | | // Wait for the queues to be drained. |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | while (ic.getQueue().size() > 0) |
| | | { |
| | | try |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (Exception e) |
| | | { |
| | | // No action needed. |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | finally |
| | | { |
| | | timer.cancel(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | // Order the threads to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | t.stopProcessing(); |
| | | } |
| | | |
| | | // Wait for each thread to stop. |
| | | for (ImportThread t : threads) |
| | | do |
| | | { |
| | | try |
| | | { |
| | | t.join(); |
| | | importedCount += t.getImportedCount(); |
| | | // Read the next entry. |
| | | Entry entry = reader.readEntry(); |
| | | |
| | | // Check for end of file. |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | // Route it according to base DN. |
| | | ImportContext importContext = getImportConfig(entry.getDN()); |
| | | |
| | | processEntry(importContext, entry); |
| | | |
| | | entriesProcessed++; |
| | | if (entriesProcessed >= importPassSize) |
| | | { |
| | | moreData = true; |
| | | break; |
| | | } |
| | | } |
| | | catch (InterruptedException ie) |
| | | catch (LDIFException e) |
| | | { |
| | | // No action needed? |
| | | if (debugEnabled()) |
| | | { |
| | | debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } while (true); |
| | | |
| | | // Wait for the queues to be drained. |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | while (ic.getQueue().size() > 0) |
| | | { |
| | | try |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (Exception e) |
| | | { |
| | | // No action needed. |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | finally |
| | | { |
| | | timer.cancel(); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if (! moreData) |
| | | // Order the threads to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | reader.close(); |
| | | t.stopProcessing(); |
| | | } |
| | | |
| | | // Wait for each thread to stop. |
| | | for (ImportThread t : threads) |
| | | { |
| | | try |
| | | { |
| | | t.join(); |
| | | importedCount += t.getImportedCount(); |
| | | } |
| | | catch (InterruptedException ie) |
| | | { |
| | | // No action needed? |
| | | } |
| | | } |
| | | } |
| | | |