| | |
| | | @Override |
| | | public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception |
| | | { |
| | | final int threadCount = |
| | | importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount(); |
| | | final int indexCount = getIndexCount(); |
| | | |
| | | final int nbRequiredBuffers = threadCount * indexCount * 2; |
| | | logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION); |
| | | logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount); |
| | | |
| | | final long startTime = System.currentTimeMillis(); |
| | | final OnDiskMergeImporter importer; |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | final LDIFReaderSource source = |
| | | new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount); |
| | | final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory()); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport(); |
| | | final BufferPool bufferPool = newBufferPool(nbRequiredBuffers)) |
| | | final int maxThreadCount = importConfig.getThreadCount() == 0 |
| | | ? getDefaultNumberOfThread() |
| | | : importConfig.getThreadCount(); |
| | | final int nbBuffersPerThread = 2 * getIndexCount(); |
| | | try(final BufferPool bufferPool = newBufferPool(maxThreadCount, nbBuffersPerThread)) |
| | | { |
| | | final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers(); |
| | | final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation() |
| | | ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter) |
| | | : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter); |
| | | |
| | | importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy); |
| | | importer.doImport(source); |
| | | } |
| | | finally |
| | | { |
| | | sorter.shutdownNow(); |
| | | if (OperatingSystem.isWindows()) |
| | | final int threadCount = bufferPool.size() / nbBuffersPerThread; |
| | | logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount); |
| | | final OnDiskMergeImporter importer; |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | final LDIFReaderSource source = |
| | | new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount); |
| | | final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory()); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport()) |
| | | { |
| | | // Try to force the JVM to close mmap()ed file so that they can be deleted. |
| | | // (see http://bugs.java.com/view_bug.do?bug_id=4715154) |
| | | System.gc(); |
| | | Runtime.getRuntime().runFinalization(); |
| | | final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers(); |
| | | final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation() |
| | | ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter) |
| | | : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter); |
| | | |
| | | importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy); |
| | | importer.doImport(source); |
| | | } |
| | | recursiveDelete(tempDir); |
| | | } |
| | | logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis() |
| | | / 1000, importer.getPhaseTwoTimeInMillis() / 1000); |
| | | finally |
| | | { |
| | | sorter.shutdownNow(); |
| | | if (OperatingSystem.isWindows()) |
| | | { |
| | | // Try to force the JVM to close mmap()ed file so that they can be deleted. |
| | | // (see http://bugs.java.com/view_bug.do?bug_id=4715154) |
| | | System.gc(); |
| | | Runtime.getRuntime().runFinalization(); |
| | | } |
| | | recursiveDelete(tempDir); |
| | | } |
| | | logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis() |
| | | / 1000, importer.getPhaseTwoTimeInMillis() / 1000); |
| | | |
| | | final long importTime = System.currentTimeMillis() - startTime; |
| | | float rate = 0; |
| | | if (importTime > 0) |
| | | { |
| | | rate = 1000f * source.getEntriesRead() / importTime; |
| | | } |
| | | logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source |
| | | .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate); |
| | | final long importTime = System.currentTimeMillis() - startTime; |
| | | float rate = 0; |
| | | if (importTime > 0) |
| | | { |
| | | rate = 1000f * source.getEntriesRead() / importTime; |
| | | } |
| | | logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source |
| | | .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate); |
| | | |
| | | return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source |
| | | .getEntriesIgnored()); |
| | | return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source |
| | | .getEntriesIgnored()); |
| | | } |
| | | } |
| | | |
| | | private static int getDefaultNumberOfThread() |
| | |
| | | return; |
| | | } |
| | | rootContainer.getStorage().close(); |
| | | final int threadCount = getDefaultNumberOfThread(); |
| | | final int nbBuffer = 2 * indexesToRebuild.size() * threadCount; |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | |
| | | final OnDiskMergeImporter importer; |
| | | final File tempDir = prepareTempDir(backendCfg, tmpDirectory); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport(); |
| | | final BufferPool bufferPool = newBufferPool(nbBuffer)) |
| | | final int nbBuffersPerThread = 2 * indexesToRebuild.size(); |
| | | try (final BufferPool bufferPool = newBufferPool(getDefaultNumberOfThread(), nbBuffersPerThread)) |
| | | { |
| | | final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy( |
| | | rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild); |
| | | final int threadCount = bufferPool.size() / nbBuffersPerThread; |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | |
| | | importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy); |
| | | importer.doImport( |
| | | new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries)); |
| | | } |
| | | finally |
| | | { |
| | | sorter.shutdown(); |
| | | recursiveDelete(tempDir); |
| | | } |
| | | final OnDiskMergeImporter importer; |
| | | final File tempDir = prepareTempDir(backendCfg, tmpDirectory); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport()) |
| | | { |
| | | final AbstractTwoPhaseImportStrategy strategy = |
| | | new RebuildIndexStrategy( |
| | | rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild); |
| | | |
| | | final long totalTime = importer.getTotalTimeInMillis(); |
| | | final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0; |
| | | logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate); |
| | | importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy); |
| | | importer.doImport(new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, |
| | | totalEntries)); |
| | | } |
| | | finally |
| | | { |
| | | sorter.shutdown(); |
| | | recursiveDelete(tempDir); |
| | | } |
| | | final long totalTime = importer.getTotalTimeInMillis(); |
| | | final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0; |
| | | logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate); |
| | | } |
| | | } |
| | | |
| | | public BufferPool newBufferPool(final int nbBuffers) throws InitializationException |
| | | /** |
| | | * Try to allocate a {@link BufferPool} with a number of buffer in it being a multiple of {@code nbBuffers} in the |
| | | * range [1, {@code maxThreadCount}] depending of the amount of memory available. |
| | | */ |
| | | BufferPool newBufferPool(final int maxThreadCount, final int nbBuffers) throws InitializationException |
| | | { |
| | | final int initialThreadCount = maxThreadCount; |
| | | final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize(); |
| | | final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0; |
| | | if (offHeapMemoryAvailable > 0) |
| | | boolean useOffHeap = (offheapMemorySize != null && offheapMemorySize > 0); |
| | | long memoryAvailable = |
| | | useOffHeap ? offheapMemorySize.longValue() : calculateAvailableHeapMemoryForBuffersAfterGC(); |
| | | int threadCount = initialThreadCount; |
| | | for (;;) |
| | | { |
| | | // Try off-heap direct buffer |
| | | logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers); |
| | | int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers); |
| | | while (bufferSize > MIN_BUFFER_SIZE) |
| | | final int nbRequiredBuffers = threadCount * nbBuffers; |
| | | try |
| | | { |
| | | try |
| | | return useOffHeap |
| | | ? newOffHeapBufferPool(nbRequiredBuffers, memoryAvailable) |
| | | : newHeapBufferPool(nbRequiredBuffers, memoryAvailable); |
| | | } |
| | | catch (InitializationException e) |
| | | { |
| | | if (threadCount > 1) |
| | | { |
| | | final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true); |
| | | final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB; |
| | | logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, |
| | | DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB); |
| | | return pool; |
| | | // Retry using less buffer by reducing the number of importer threads. |
| | | threadCount--; |
| | | } |
| | | catch (OutOfMemoryError e) |
| | | else if (useOffHeap) |
| | | { |
| | | bufferSize /= 2; |
| | | // Retry using on-heap buffer |
| | | useOffHeap = false; |
| | | memoryAvailable = calculateAvailableHeapMemoryForBuffersAfterGC(); |
| | | threadCount = initialThreadCount; |
| | | } |
| | | else |
| | | { |
| | | throw e; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Off-heap memory allocation has failed or is disabled. |
| | | final long availableMemory = calculateAvailableHeapMemoryForBuffers(); |
| | | logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers); |
| | | private BufferPool newOffHeapBufferPool(final int nbBuffers, long offHeapMemoryAvailable) |
| | | throws InitializationException |
| | | { |
| | | // Try off-heap direct buffer |
| | | logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers); |
| | | int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers); |
| | | while (bufferSize > MIN_BUFFER_SIZE) |
| | | { |
| | | BufferPool pool = null; |
| | | try |
| | | { |
| | | pool = new BufferPool(nbBuffers, bufferSize, true); |
| | | final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB; |
| | | logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, |
| | | bufferSize / KB); |
| | | return pool; |
| | | } |
| | | catch (OutOfMemoryError e) |
| | | { |
| | | bufferSize /= 2; |
| | | closeSilently(pool); |
| | | pool = null; |
| | | } |
| | | } |
| | | throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapMemoryAvailable, nbBuffers |
| | | * MIN_BUFFER_SIZE)); |
| | | } |
| | | |
| | | private BufferPool newHeapBufferPool(final int nbBuffers, final long heapMemoryAvailable) |
| | | throws InitializationException |
| | | { |
| | | final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY; |
| | | if (availableMemory < minimumRequiredMemory) |
| | | if (heapMemoryAvailable < minimumRequiredMemory) |
| | | { |
| | | // Not enough memory. |
| | | throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory)); |
| | | throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(heapMemoryAvailable, minimumRequiredMemory)); |
| | | } |
| | | final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY; |
| | | logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, heapMemoryAvailable, nbBuffers); |
| | | final long buffersMemory = heapMemoryAvailable - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY; |
| | | final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE); |
| | | logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize); |
| | | return new BufferPool(nbBuffers, bufferSize, false); |
| | |
| | | * Calculates the amount of available memory which can be used by this import, taking into account whether |
| | | * the import is running offline or online as a task. |
| | | */ |
| | | private long calculateAvailableHeapMemoryForBuffers() |
| | | private long calculateAvailableHeapMemoryForBuffersAfterGC() |
| | | { |
| | | final long totalAvailableMemory = getAvailableMemoryAfterGC(); |
| | | int importMemPct = 100; |
| | | if (totalAvailableMemory <= SMALL_HEAP_SIZE) |
| | | { |
| | | // Be pessimistic when memory is low. |
| | | importMemPct -= 65; |
| | | } |
| | | return (totalAvailableMemory * importMemPct / 100); |
| | | } |
| | | } |
| | | final Runtime runTime = Runtime.getRuntime(); |
| | | runTime.gc(); |
| | | runTime.gc(); |
| | | |
| | | private static long getAvailableMemoryAfterGC() |
| | | { |
| | | final Runtime runTime = Runtime.getRuntime(); |
| | | runTime.gc(); |
| | | runTime.gc(); |
| | | |
| | | // First try calculation based on oldgen size |
| | | final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans(); |
| | | for (final MemoryPoolMXBean mpool : mpools) |
| | | { |
| | | final MemoryUsage usage = mpool.getUsage(); |
| | | if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0) |
| | | // First try calculation based on oldgen size |
| | | final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans(); |
| | | for (final MemoryPoolMXBean mpool : mpools) |
| | | { |
| | | final long max = usage.getMax(); |
| | | final long hardLimit = (long) (max * 0.90); |
| | | final long softLimit = (long) (max * 0.69); |
| | | final long used = usage.getUsed(); |
| | | return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used); |
| | | final MemoryUsage usage = mpool.getUsage(); |
| | | if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0) |
| | | { |
| | | final long max = usage.getMax(); |
| | | return (max > SMALL_HEAP_SIZE ? (max * 90 / 100) : (max * 70 / 100)); |
| | | } |
| | | } |
| | | // Fall back to 40% of overall heap size (no need to do gc() again). |
| | | return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100; |
| | | } |
| | | // Fall back to 40% of overall heap size (no need to do gc() again). |
| | | return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100; |
| | | } |
| | | |
| | | /** Source of LDAP {@link Entry}s to process. */ |
| | |
| | | /** Max size of phase one buffer. */ |
| | | private static final int MAX_BUFFER_SIZE = 4 * MB; |
| | | /** Min size of phase one buffer. */ |
| | | private static final int MIN_BUFFER_SIZE = 4 * KB; |
| | | private static final int MIN_BUFFER_SIZE = 32 * KB; |
| | | /** DB cache size to use during import. */ |
| | | private static final int DB_CACHE_SIZE = 32 * MB; |
| | | /** Required free memory for this importer. */ |
| | |
| | | BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect) |
| | | { |
| | | this.pool = new ArrayBlockingQueue<>(nbBuffer); |
| | | for (int i = 0; i < nbBuffer; i++) |
| | | try |
| | | { |
| | | pool.offer(new MemoryBuffer(allocateDirect |
| | | ? ByteBuffer.allocateDirect(bufferSize) |
| | | : ByteBuffer.allocate(bufferSize))); |
| | | for (int i = 0; i < nbBuffer; i++) |
| | | { |
| | | pool.offer(new MemoryBuffer(allocateDirect |
| | | ? ByteBuffer.allocateDirect(bufferSize) |
| | | : ByteBuffer.allocate(bufferSize))); |
| | | } |
| | | } |
| | | catch(OutOfMemoryError e) |
| | | { |
| | | close(); |
| | | throw e; |
| | | } |
| | | } |
| | | |
| | | private int size() |
| | | { |
| | | return pool.size(); |
| | | } |
| | | |
| | | private MemoryBuffer get() |