From 997c10c4c5946d76f99f1f2ab9d0786ead1e545c Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 30 Jun 2016 12:06:48 +0000
Subject: [PATCH] OPENDJ-3171: rebuild-index/importer are killed on low-memory configuration.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 294 ++++++++++++++++++++++++++++++++++------------------------
1 files changed, 171 insertions(+), 123 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 7d1ec5d..6912373 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -182,58 +182,60 @@
@Override
public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
{
- final int threadCount =
- importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
- final int indexCount = getIndexCount();
-
- final int nbRequiredBuffers = threadCount * indexCount * 2;
logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
- logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
final long startTime = System.currentTimeMillis();
- final OnDiskMergeImporter importer;
- final ExecutorService sorter =
- Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
- final LDIFReaderSource source =
- new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
- final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
- try (final Importer dbStorage = rootContainer.getStorage().startImport();
- final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
+ final int maxThreadCount = importConfig.getThreadCount() == 0
+ ? getDefaultNumberOfThread()
+ : importConfig.getThreadCount();
+ final int nbBuffersPerThread = 2 * getIndexCount();
+ try(final BufferPool bufferPool = newBufferPool(maxThreadCount, nbBuffersPerThread))
{
- final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
- final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
- ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
- : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
-
- importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
- importer.doImport(source);
- }
- finally
- {
- sorter.shutdownNow();
- if (OperatingSystem.isWindows())
+ final int threadCount = bufferPool.size() / nbBuffersPerThread;
+ logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
+ final OnDiskMergeImporter importer;
+ final ExecutorService sorter =
+ Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
+ final LDIFReaderSource source =
+ new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
+ final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
+ try (final Importer dbStorage = rootContainer.getStorage().startImport())
{
- // Try to force the JVM to close mmap()ed file so that they can be deleted.
- // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
- System.gc();
- Runtime.getRuntime().runFinalization();
+ final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
+ final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
+ ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
+ : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
+
+ importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
+ importer.doImport(source);
}
- recursiveDelete(tempDir);
- }
- logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
- / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
+ finally
+ {
+ sorter.shutdownNow();
+ if (OperatingSystem.isWindows())
+ {
+ // Try to force the JVM to close mmap()ed file so that they can be deleted.
+ // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+ System.gc();
+ Runtime.getRuntime().runFinalization();
+ }
+ recursiveDelete(tempDir);
+ }
+ logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
+ / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
- final long importTime = System.currentTimeMillis() - startTime;
- float rate = 0;
- if (importTime > 0)
- {
- rate = 1000f * source.getEntriesRead() / importTime;
- }
- logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
- .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
+ final long importTime = System.currentTimeMillis() - startTime;
+ float rate = 0;
+ if (importTime > 0)
+ {
+ rate = 1000f * source.getEntriesRead() / importTime;
+ }
+ logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
+ .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
- return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
- .getEntriesIgnored());
+ return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
+ .getEntriesIgnored());
+ }
}
private static int getDefaultNumberOfThread()
@@ -308,70 +310,118 @@
return;
}
rootContainer.getStorage().close();
- final int threadCount = getDefaultNumberOfThread();
- final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
- final ExecutorService sorter =
- Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
-
- final OnDiskMergeImporter importer;
- final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
- try (final Importer dbStorage = rootContainer.getStorage().startImport();
- final BufferPool bufferPool = newBufferPool(nbBuffer))
+ final int nbBuffersPerThread = 2 * indexesToRebuild.size();
+ try (final BufferPool bufferPool = newBufferPool(getDefaultNumberOfThread(), nbBuffersPerThread))
{
- final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
- rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
+ final int threadCount = bufferPool.size() / nbBuffersPerThread;
+ final ExecutorService sorter =
+ Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
- importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
- importer.doImport(
- new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries));
- }
- finally
- {
- sorter.shutdown();
- recursiveDelete(tempDir);
- }
+ final OnDiskMergeImporter importer;
+ final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
+ try (final Importer dbStorage = rootContainer.getStorage().startImport())
+ {
+ final AbstractTwoPhaseImportStrategy strategy =
+ new RebuildIndexStrategy(
+ rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
- final long totalTime = importer.getTotalTimeInMillis();
- final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
- logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
+ importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
+ importer.doImport(new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount,
+ totalEntries));
+ }
+ finally
+ {
+ sorter.shutdown();
+ recursiveDelete(tempDir);
+ }
+ final long totalTime = importer.getTotalTimeInMillis();
+ final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
+ logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
+ }
}
- public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
+ /**
+ * Try to allocate a {@link BufferPool} with a number of buffer in it being a multiple of {@code nbBuffers} in the
+ * range [1, {@code maxThreadCount}] depending of the amount of memory available.
+ */
+ BufferPool newBufferPool(final int maxThreadCount, final int nbBuffers) throws InitializationException
{
+ final int initialThreadCount = maxThreadCount;
final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
- final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
- if (offHeapMemoryAvailable > 0)
+ boolean useOffHeap = (offheapMemorySize != null && offheapMemorySize > 0);
+ long memoryAvailable =
+ useOffHeap ? offheapMemorySize.longValue() : calculateAvailableHeapMemoryForBuffersAfterGC();
+ int threadCount = initialThreadCount;
+ for (;;)
{
- // Try off-heap direct buffer
- logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
- int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
- while (bufferSize > MIN_BUFFER_SIZE)
+ final int nbRequiredBuffers = threadCount * nbBuffers;
+ try
{
- try
+ return useOffHeap
+ ? newOffHeapBufferPool(nbRequiredBuffers, memoryAvailable)
+ : newHeapBufferPool(nbRequiredBuffers, memoryAvailable);
+ }
+ catch (InitializationException e)
+ {
+ if (threadCount > 1)
{
- final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
- final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
- logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
- DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
- return pool;
+ // Retry using less buffer by reducing the number of importer threads.
+ threadCount--;
}
- catch (OutOfMemoryError e)
+ else if (useOffHeap)
{
- bufferSize /= 2;
+ // Retry using on-heap buffer
+ useOffHeap = false;
+ memoryAvailable = calculateAvailableHeapMemoryForBuffersAfterGC();
+ threadCount = initialThreadCount;
+ }
+ else
+ {
+ throw e;
}
}
}
+ }
- // Off-heap memory allocation has failed or is disabled.
- final long availableMemory = calculateAvailableHeapMemoryForBuffers();
- logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
+ private BufferPool newOffHeapBufferPool(final int nbBuffers, long offHeapMemoryAvailable)
+ throws InitializationException
+ {
+ // Try off-heap direct buffer
+ logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
+ int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
+ while (bufferSize > MIN_BUFFER_SIZE)
+ {
+ BufferPool pool = null;
+ try
+ {
+ pool = new BufferPool(nbBuffers, bufferSize, true);
+ final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
+ logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers,
+ bufferSize / KB);
+ return pool;
+ }
+ catch (OutOfMemoryError e)
+ {
+ bufferSize /= 2;
+ closeSilently(pool);
+ pool = null;
+ }
+ }
+ throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapMemoryAvailable, nbBuffers
+ * MIN_BUFFER_SIZE));
+ }
+
+ private BufferPool newHeapBufferPool(final int nbBuffers, final long heapMemoryAvailable)
+ throws InitializationException
+ {
final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
- if (availableMemory < minimumRequiredMemory)
+ if (heapMemoryAvailable < minimumRequiredMemory)
{
// Not enough memory.
- throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
+ throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(heapMemoryAvailable, minimumRequiredMemory));
}
- final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
+ logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, heapMemoryAvailable, nbBuffers);
+ final long buffersMemory = heapMemoryAvailable - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
return new BufferPool(nbBuffers, bufferSize, false);
@@ -520,41 +570,26 @@
* Calculates the amount of available memory which can be used by this import, taking into account whether
* the import is running offline or online as a task.
*/
- private long calculateAvailableHeapMemoryForBuffers()
+ private long calculateAvailableHeapMemoryForBuffersAfterGC()
{
- final long totalAvailableMemory = getAvailableMemoryAfterGC();
- int importMemPct = 100;
- if (totalAvailableMemory <= SMALL_HEAP_SIZE)
- {
- // Be pessimistic when memory is low.
- importMemPct -= 65;
- }
- return (totalAvailableMemory * importMemPct / 100);
- }
- }
+ final Runtime runTime = Runtime.getRuntime();
+ runTime.gc();
+ runTime.gc();
- private static long getAvailableMemoryAfterGC()
- {
- final Runtime runTime = Runtime.getRuntime();
- runTime.gc();
- runTime.gc();
-
- // First try calculation based on oldgen size
- final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
- for (final MemoryPoolMXBean mpool : mpools)
- {
- final MemoryUsage usage = mpool.getUsage();
- if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+ // First try calculation based on oldgen size
+ final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
+ for (final MemoryPoolMXBean mpool : mpools)
{
- final long max = usage.getMax();
- final long hardLimit = (long) (max * 0.90);
- final long softLimit = (long) (max * 0.69);
- final long used = usage.getUsed();
- return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
+ final MemoryUsage usage = mpool.getUsage();
+ if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+ {
+ final long max = usage.getMax();
+ return (max > SMALL_HEAP_SIZE ? (max * 90 / 100) : (max * 70 / 100));
+ }
}
+ // Fall back to 40% of overall heap size (no need to do gc() again).
+ return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
}
- // Fall back to 40% of overall heap size (no need to do gc() again).
- return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
}
/** Source of LDAP {@link Entry}s to process. */
@@ -849,7 +884,7 @@
/** Max size of phase one buffer. */
private static final int MAX_BUFFER_SIZE = 4 * MB;
/** Min size of phase one buffer. */
- private static final int MIN_BUFFER_SIZE = 4 * KB;
+ private static final int MIN_BUFFER_SIZE = 32 * KB;
/** DB cache size to use during import. */
private static final int DB_CACHE_SIZE = 32 * MB;
/** Required free memory for this importer. */
@@ -2771,12 +2806,25 @@
BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
{
this.pool = new ArrayBlockingQueue<>(nbBuffer);
- for (int i = 0; i < nbBuffer; i++)
+ try
{
- pool.offer(new MemoryBuffer(allocateDirect
- ? ByteBuffer.allocateDirect(bufferSize)
- : ByteBuffer.allocate(bufferSize)));
+ for (int i = 0; i < nbBuffer; i++)
+ {
+ pool.offer(new MemoryBuffer(allocateDirect
+ ? ByteBuffer.allocateDirect(bufferSize)
+ : ByteBuffer.allocate(bufferSize)));
+ }
}
+ catch(OutOfMemoryError e)
+ {
+ close();
+ throw e;
+ }
+ }
+
+ private int size()
+ {
+ return pool.size();
}
private MemoryBuffer get()
--
Gitblit v1.10.0