| | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.JebMessages.*; |
| | | import org.opends.server.admin.std.server.JEBackendCfg; |
| | | import org.opends.server.admin.std.server.LocalDBBackendCfg; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.config.ConfigException; |
| | | |
| | |
| | | /** |
| | | * The JE backend configuration. |
| | | */ |
| | | private JEBackendCfg config; |
| | | private LocalDBBackendCfg config; |
| | | |
| | | /** |
| | | * The root container used for this import job. |
| | |
| | | this.config = rootContainer.getConfiguration(); |
| | | this.mergePassNumber = 1; |
| | | this.entriesProcessed = 0; |
| | | this.importPassSize = config.getBackendImportPassSize(); |
| | | this.importPassSize = config.getImportPassSize(); |
| | | if (importPassSize <= 0) |
| | | { |
| | | importPassSize = Integer.MAX_VALUE; |
| | |
| | | { |
| | | // Divide the total buffer size by the number of threads |
| | | // and give that much to each thread. |
| | | int importThreadCount = config.getBackendImportThreadCount(); |
| | | long bufferSize = config.getBackendImportBufferSize() / |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | long bufferSize = config.getImportBufferSize() / |
| | | (importThreadCount*rootContainer.getBaseDNs().size()); |
| | | |
| | | message = INFO_JEB_IMPORT_THREAD_COUNT.get(importThreadCount); |
| | |
| | | startTime = System.currentTimeMillis(); |
| | | |
| | | // Create a temporary work directory. |
| | | File tempDir = getFileForPath(config.getBackendImportTempDirectory()); |
| | | File tempDir = getFileForPath(config.getImportTempDirectory()); |
| | | if(!tempDir.exists() && !tempDir.mkdir()) |
| | | { |
| | | Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get( |
| | |
| | | // For each configured attribute index. |
| | | for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes()) |
| | | { |
| | | int indexEntryLimit = config.getBackendIndexEntryLimit(); |
| | | int indexEntryLimit = config.getIndexEntryLimit(); |
| | | if(attrIndex.getConfiguration().getIndexEntryLimit() != null) |
| | | { |
| | | indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit(); |
| | |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Children, |
| | | config.getBackendIndexEntryLimit()); |
| | | config.getIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | |
| | | // Id2Subtree index. |
| | |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Subtree, |
| | | config.getBackendIndexEntryLimit()); |
| | | config.getIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | |
| | |
| | | private void startWorkerThreads() throws DatabaseException |
| | | { |
| | | // Create one set of worker threads for each base DN. |
| | | int importThreadCount = config.getBackendImportThreadCount(); |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | for (int i = 0; i < importThreadCount; i++) |
| | |
| | | |
| | | // Create an entry queue. |
| | | LinkedBlockingQueue<Entry> queue = |
| | | new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize()); |
| | | new LinkedBlockingQueue<Entry>(config.getImportQueueSize()); |
| | | importContext.setQueue(queue); |
| | | |
| | | // Set the include and exclude branches |