From 134f26626acf0f76f0e999875f3c5a90c7b16034 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 30 Jun 2016 08:47:04 +0000
Subject: [PATCH] OPENDJ-3171: rebuild-index/importer are killed on low-memory configuration.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 103 +++++++++++++++++++++++++++------------------------
1 files changed, 55 insertions(+), 48 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index a80c6d9..7d1ec5d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -42,6 +42,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
@@ -121,13 +124,11 @@
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.ServerContext;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
-import org.opends.server.util.Platform;
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
@@ -169,13 +170,11 @@
/** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
private static final int SMALL_HEAP_SIZE = 256 * MB;
- private final ServerContext serverContext;
private final RootContainer rootContainer;
private final PluggableBackendCfg backendCfg;
- StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
+ StrategyImpl(RootContainer rootContainer, PluggableBackendCfg backendCfg)
{
- this.serverContext = serverContext;
this.rootContainer = rootContainer;
this.backendCfg = backendCfg;
}
@@ -337,40 +336,43 @@
logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
}
- public BufferPool newBufferPool(int nbBuffers) throws InitializationException
+ public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
{
- // Try off-heap direct buffer
- int bufferSize = MAX_BUFFER_SIZE;
- do
+ final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
+ final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
+ if (offHeapMemoryAvailable > 0)
{
- try
+ // Try off-heap direct buffer
+ logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
+ int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
+ while (bufferSize > MIN_BUFFER_SIZE)
{
- final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
- logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
- DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
- return pool;
- }
- catch (OutOfMemoryError e)
- {
- bufferSize /= 2;
+ try
+ {
+ final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
+ final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
+ logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
+ DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
+ return pool;
+ }
+ catch (OutOfMemoryError e)
+ {
+ bufferSize /= 2;
+ }
}
}
- while (bufferSize > MIN_BUFFER_SIZE);
- // Off-line mode or direct memory allocation failed.
+ // Off-heap memory allocation has failed or is disabled.
final long availableMemory = calculateAvailableHeapMemoryForBuffers();
logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
- long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
+ final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
if (availableMemory < minimumRequiredMemory)
{
// Not enough memory.
throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
}
- bufferSize = (int) (availableMemory / nbBuffers);
- if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
- {
- bufferSize = MAX_BUFFER_SIZE;
- }
+ final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
+ final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
return new BufferPool(nbBuffers, bufferSize, false);
}
@@ -477,7 +479,6 @@
throw new InitializationException(ERR_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(name));
}
-
private static Collection<String> getIndexNames(IndexType indexType, AttributeIndex attrIndex)
{
if (indexType.equals(IndexType.PRESENCE))
@@ -521,35 +522,41 @@
*/
private long calculateAvailableHeapMemoryForBuffers()
{
- final long totalAvailableMemory;
- if (DirectoryServer.isRunning())
- {
- // Online import/rebuild.
- final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
- totalAvailableMemory = Math.max(availableMemory, 16 * MB);
- }
- else
- {
- // Offline import/rebuild.
- // call twice gc to ensure finalizers are called
- // and young to old gen references are properly gc'd
- Runtime.getRuntime().gc();
- Runtime.getRuntime().gc();
-
- totalAvailableMemory = Platform.getUsableMemoryForCaching();
- }
-
- // Now take into account various fudge factors.
- int importMemPct = 90;
+ final long totalAvailableMemory = getAvailableMemoryAfterGC();
+ int importMemPct = 100;
if (totalAvailableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
- importMemPct -= 25;
+ importMemPct -= 65;
}
return (totalAvailableMemory * importMemPct / 100);
}
}
+ private static long getAvailableMemoryAfterGC()
+ {
+ final Runtime runTime = Runtime.getRuntime();
+ runTime.gc();
+ runTime.gc();
+
+ // First try calculation based on oldgen size
+ final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
+ for (final MemoryPoolMXBean mpool : mpools)
+ {
+ final MemoryUsage usage = mpool.getUsage();
+ if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+ {
+ final long max = usage.getMax();
+ final long hardLimit = (long) (max * 0.90);
+ final long softLimit = (long) (max * 0.69);
+ final long used = usage.getUsed();
+ return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
+ }
+ }
+ // Fall back to 40% of overall heap size (no need to do gc() again).
+ return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
+ }
+
/** Source of LDAP {@link Entry}s to process. */
private interface Source
{
--
Gitblit v1.10.0