From 134f26626acf0f76f0e999875f3c5a90c7b16034 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 30 Jun 2016 08:47:04 +0000
Subject: [PATCH] OPENDJ-3171: rebuild-index/importer are killed on low-memory configuration.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 103 +++++++++++++------------
opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java | 91 ----------------------
opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml | 18 ++++
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java | 8 +-
opendj-server-legacy/resource/schema/02-config.ldif | 9 ++
5 files changed, 85 insertions(+), 144 deletions(-)
diff --git a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml
index f6bfb87..466ab3e 100644
--- a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml
+++ b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml
@@ -375,4 +375,22 @@
</ldap:attribute>
</adm:profile>
</adm:property>
+ <adm:property name="import-offheap-memory-size" advanced="true">
+ <adm:synopsis>
+ Specifies the amount of off-heap memory dedicated to the online operation (import-ldif, rebuild-index).
+ </adm:synopsis>
+ <adm:default-behavior>
+ <adm:alias>
+ <adm:synopsis>Use only heap memory.</adm:synopsis>
+ </adm:alias>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:size lower-limit="0 MB" />
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-import-offheap-memory-size</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
</adm:managed-object>
diff --git a/opendj-server-legacy/resource/schema/02-config.ldif b/opendj-server-legacy/resource/schema/02-config.ldif
index 6c8531a..e6049f2 100644
--- a/opendj-server-legacy/resource/schema/02-config.ldif
+++ b/opendj-server-legacy/resource/schema/02-config.ldif
@@ -3943,6 +3943,12 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
SINGLE-VALUE
X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.185
+ NAME 'ds-cfg-import-offheap-memory-size'
+ EQUALITY caseIgnoreMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDJ Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler'
SUP top
@@ -5865,7 +5871,8 @@
ds-cfg-confidentiality-enabled $
ds-cfg-cipher-transformation $
ds-cfg-cipher-key-length $
- ds-cfg-index-filter-analyzer-max-filters )
+ ds-cfg-index-filter-analyzer-max-filters $
+ ds-cfg-import-offheap-memory-size )
X-ORIGIN 'OpenDJ Directory Server' )
objectClasses: ( 1.3.6.1.4.1.36733.2.1.2.23
NAME 'ds-cfg-pdb-backend'
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
index 4c65bec..517e3b0 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -654,7 +654,7 @@
}
rootContainer = newRootContainer(AccessMode.READ_WRITE);
rootContainer.getStorage().close();
- return getImportStrategy(serverContext, rootContainer).importLDIF(importConfig);
+ return getImportStrategy(rootContainer).importLDIF(importConfig);
}
catch (StorageRuntimeException e)
{
@@ -696,9 +696,9 @@
}
}
- private ImportStrategy getImportStrategy(ServerContext serverContext, RootContainer rootContainer)
+ private ImportStrategy getImportStrategy(RootContainer rootContainer)
{
- return new OnDiskMergeImporter.StrategyImpl(serverContext, rootContainer, cfg);
+ return new OnDiskMergeImporter.StrategyImpl(rootContainer, cfg);
}
@Override
@@ -769,7 +769,7 @@
{
rootContainer = newRootContainer(AccessMode.READ_WRITE);
}
- getImportStrategy(serverContext, rootContainer).rebuildIndex(rebuildConfig);
+ getImportStrategy(rootContainer).rebuildIndex(rebuildConfig);
}
catch (ConfigException ce)
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index a80c6d9..7d1ec5d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -42,6 +42,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
@@ -121,13 +124,11 @@
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.ServerContext;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
-import org.opends.server.util.Platform;
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
@@ -169,13 +170,11 @@
/** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
private static final int SMALL_HEAP_SIZE = 256 * MB;
- private final ServerContext serverContext;
private final RootContainer rootContainer;
private final PluggableBackendCfg backendCfg;
- StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
+ StrategyImpl(RootContainer rootContainer, PluggableBackendCfg backendCfg)
{
- this.serverContext = serverContext;
this.rootContainer = rootContainer;
this.backendCfg = backendCfg;
}
@@ -337,40 +336,43 @@
logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
}
- public BufferPool newBufferPool(int nbBuffers) throws InitializationException
+ public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
{
- // Try off-heap direct buffer
- int bufferSize = MAX_BUFFER_SIZE;
- do
+ final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
+ final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
+ if (offHeapMemoryAvailable > 0)
{
- try
+ // Try off-heap direct buffer
+ logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
+ int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
+ while (bufferSize > MIN_BUFFER_SIZE)
{
- final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
- logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
- DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
- return pool;
- }
- catch (OutOfMemoryError e)
- {
- bufferSize /= 2;
+ try
+ {
+ final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
+ final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
+ logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
+ DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
+ return pool;
+ }
+ catch (OutOfMemoryError e)
+ {
+ bufferSize /= 2;
+ }
}
}
- while (bufferSize > MIN_BUFFER_SIZE);
- // Off-line mode or direct memory allocation failed.
+ // Off-heap memory allocation has failed or is disabled.
final long availableMemory = calculateAvailableHeapMemoryForBuffers();
logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
- long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
+ final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
if (availableMemory < minimumRequiredMemory)
{
// Not enough memory.
throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
}
- bufferSize = (int) (availableMemory / nbBuffers);
- if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
- {
- bufferSize = MAX_BUFFER_SIZE;
- }
+ final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
+ final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
return new BufferPool(nbBuffers, bufferSize, false);
}
@@ -477,7 +479,6 @@
throw new InitializationException(ERR_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(name));
}
-
private static Collection<String> getIndexNames(IndexType indexType, AttributeIndex attrIndex)
{
if (indexType.equals(IndexType.PRESENCE))
@@ -521,35 +522,41 @@
*/
private long calculateAvailableHeapMemoryForBuffers()
{
- final long totalAvailableMemory;
- if (DirectoryServer.isRunning())
- {
- // Online import/rebuild.
- final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
- totalAvailableMemory = Math.max(availableMemory, 16 * MB);
- }
- else
- {
- // Offline import/rebuild.
- // call twice gc to ensure finalizers are called
- // and young to old gen references are properly gc'd
- Runtime.getRuntime().gc();
- Runtime.getRuntime().gc();
-
- totalAvailableMemory = Platform.getUsableMemoryForCaching();
- }
-
- // Now take into account various fudge factors.
- int importMemPct = 90;
+ final long totalAvailableMemory = getAvailableMemoryAfterGC();
+ int importMemPct = 100;
if (totalAvailableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
- importMemPct -= 25;
+ importMemPct -= 65;
}
return (totalAvailableMemory * importMemPct / 100);
}
}
+ private static long getAvailableMemoryAfterGC()
+ {
+ final Runtime runTime = Runtime.getRuntime();
+ runTime.gc();
+ runTime.gc();
+
+ // First try calculation based on oldgen size
+ final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
+ for (final MemoryPoolMXBean mpool : mpools)
+ {
+ final MemoryUsage usage = mpool.getUsage();
+ if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+ {
+ final long max = usage.getMax();
+ final long hardLimit = (long) (max * 0.90);
+ final long softLimit = (long) (max * 0.69);
+ final long used = usage.getUsed();
+ return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
+ }
+ }
+ // Fall back to 40% of overall heap size (no need to do gc() again).
+ return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
+ }
+
/** Source of LDAP {@link Entry}s to process. */
private interface Source
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
index 8f6d878..ce0c8a2 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
@@ -25,14 +25,10 @@
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
-import java.util.List;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
@@ -411,65 +407,6 @@
{
return cert.getSubjectDN().equals(cert.getIssuerDN());
}
-
-
-
- private long getUsableMemoryForCaching()
- {
- long youngGenSize = 0;
- long oldGenSize = 0;
-
- List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
- for (MemoryPoolMXBean mpool : mpools)
- {
- MemoryUsage usage = mpool.getUsage();
- if (usage != null)
- {
- String name = mpool.getName();
- if (name.equalsIgnoreCase("PS Eden Space"))
- {
- // Parallel.
- youngGenSize = usage.getMax();
- }
- else if (name.equalsIgnoreCase("PS Old Gen"))
- {
- // Parallel.
- oldGenSize = usage.getMax();
- }
- else if (name.equalsIgnoreCase("Par Eden Space"))
- {
- // CMS.
- youngGenSize = usage.getMax();
- }
- else if (name.equalsIgnoreCase("CMS Old Gen"))
- {
- // CMS.
- oldGenSize = usage.getMax();
- }
- }
- }
-
- if (youngGenSize > 0 && oldGenSize > youngGenSize)
- {
- // We can calculate available memory based on GC info.
- return oldGenSize - youngGenSize;
- }
- else if (oldGenSize > 0)
- {
- // Small old gen. It is going to be difficult to avoid full GCs if the
- // young gen is bigger.
- return oldGenSize * 40 / 100;
- }
- else
- {
- // Unknown GC (G1, JRocket, etc).
- Runtime runTime = Runtime.getRuntime();
- runTime.gc();
- runTime.gc();
- return (runTime.freeMemory() + (runTime.maxMemory() - runTime
- .totalMemory())) * 40 / 100;
- }
- }
}
@@ -587,34 +524,6 @@
return javaVendor.startsWith(vendor);
}
-
-
- /**
- * Calculates the usable memory which could potentially be used by the
- * application for caching objects. This method <b>does not</b> look at the
- * amount of free memory, but instead tries to query the JVM's GC settings in
- * order to determine the amount of usable memory in the old generation (or
- * equivalent). More specifically, applications may also need to take into
- * account the amount of memory already in use, for example by performing the
- * following:
- *
- * <pre>
- * Runtime runTime = Runtime.getRuntime();
- * runTime.gc();
- * runTime.gc();
- * long freeCommittedMemory = runTime.freeMemory();
- * long uncommittedMemory = runTime.maxMemory() - runTime.totalMemory();
- * long freeMemory = freeCommittedMemory + uncommittedMemory;
- * </pre>
- *
- * @return The usable memory which could potentially be used by the
- * application for caching objects.
- */
- public static long getUsableMemoryForCaching()
- {
- return IMPL.getUsableMemoryForCaching();
- }
-
/**
* Computes the number of replay/worker/cleaner threads based on the number of cpus in the system.
* Allows for a multiplier to be specified and a minimum value to be returned if not enough processors
--
Gitblit v1.10.0