From 134f26626acf0f76f0e999875f3c5a90c7b16034 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 30 Jun 2016 08:47:04 +0000
Subject: [PATCH] OPENDJ-3171: rebuild-index/importer are killed on low-memory configuration.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java                       |  103 +++++++++++++------------
 opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java                                                |   91 ----------------------
 opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml |   18 ++++
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java                               |    8 +-
 opendj-server-legacy/resource/schema/02-config.ldif                                                                    |    9 ++
 5 files changed, 85 insertions(+), 144 deletions(-)

diff --git a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml
index f6bfb87..466ab3e 100644
--- a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml
+++ b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/PluggableBackendConfiguration.xml
@@ -375,4 +375,22 @@
       </ldap:attribute>
     </adm:profile>
   </adm:property>
+  <adm:property name="import-offheap-memory-size" advanced="true">
+    <adm:synopsis>
+      Specifies the amount of off-heap memory dedicated to the online operation (import-ldif, rebuild-index).
+    </adm:synopsis>
+    <adm:default-behavior>
+      <adm:alias>
+        <adm:synopsis>Use only heap memory.</adm:synopsis>
+      </adm:alias>
+    </adm:default-behavior>
+    <adm:syntax>
+        <adm:size lower-limit="0 MB" />
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-import-offheap-memory-size</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
 </adm:managed-object>
diff --git a/opendj-server-legacy/resource/schema/02-config.ldif b/opendj-server-legacy/resource/schema/02-config.ldif
index 6c8531a..e6049f2 100644
--- a/opendj-server-legacy/resource/schema/02-config.ldif
+++ b/opendj-server-legacy/resource/schema/02-config.ldif
@@ -3943,6 +3943,12 @@
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
   SINGLE-VALUE
   X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.185
+  NAME 'ds-cfg-import-offheap-memory-size'
+  EQUALITY caseIgnoreMatch
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDJ Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler'
   SUP top
@@ -5865,7 +5871,8 @@
         ds-cfg-confidentiality-enabled $
         ds-cfg-cipher-transformation $
         ds-cfg-cipher-key-length $
-        ds-cfg-index-filter-analyzer-max-filters )
+        ds-cfg-index-filter-analyzer-max-filters $
+        ds-cfg-import-offheap-memory-size )
   X-ORIGIN 'OpenDJ Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.36733.2.1.2.23
   NAME 'ds-cfg-pdb-backend'
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
index 4c65bec..517e3b0 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -654,7 +654,7 @@
       }
       rootContainer = newRootContainer(AccessMode.READ_WRITE);
       rootContainer.getStorage().close();
-      return getImportStrategy(serverContext, rootContainer).importLDIF(importConfig);
+      return getImportStrategy(rootContainer).importLDIF(importConfig);
     }
     catch (StorageRuntimeException e)
     {
@@ -696,9 +696,9 @@
     }
   }
 
-  private ImportStrategy getImportStrategy(ServerContext serverContext, RootContainer rootContainer)
+  private ImportStrategy getImportStrategy(RootContainer rootContainer)
   {
-    return new OnDiskMergeImporter.StrategyImpl(serverContext, rootContainer, cfg);
+    return new OnDiskMergeImporter.StrategyImpl(rootContainer, cfg);
   }
 
   @Override
@@ -769,7 +769,7 @@
       {
         rootContainer = newRootContainer(AccessMode.READ_WRITE);
       }
-      getImportStrategy(serverContext, rootContainer).rebuildIndex(rebuildConfig);
+      getImportStrategy(rootContainer).rebuildIndex(rebuildConfig);
     }
     catch (ConfigException ce)
     {
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index a80c6d9..7d1ec5d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -42,6 +42,9 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
@@ -121,13 +124,11 @@
 import org.opends.server.backends.pluggable.spi.WriteOperation;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.ServerContext;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.LDIFImportResult;
-import org.opends.server.util.Platform;
 
 import com.forgerock.opendj.util.OperatingSystem;
 import com.forgerock.opendj.util.PackedLong;
@@ -169,13 +170,11 @@
     /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
     private static final int SMALL_HEAP_SIZE = 256 * MB;
 
-    private final ServerContext serverContext;
     private final RootContainer rootContainer;
     private final PluggableBackendCfg backendCfg;
 
-    StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
+    StrategyImpl(RootContainer rootContainer, PluggableBackendCfg backendCfg)
     {
-      this.serverContext = serverContext;
       this.rootContainer = rootContainer;
       this.backendCfg = backendCfg;
     }
@@ -337,40 +336,43 @@
       logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
     }
 
-    public BufferPool newBufferPool(int nbBuffers) throws InitializationException
+    public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
     {
-      // Try off-heap direct buffer
-      int bufferSize = MAX_BUFFER_SIZE;
-      do
+      final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
+      final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
+      if (offHeapMemoryAvailable > 0)
       {
-        try
+        // Try off-heap direct buffer
+        logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
+        int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
+        while (bufferSize > MIN_BUFFER_SIZE)
         {
-          final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
-          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
-              DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
-          return pool;
-        }
-        catch (OutOfMemoryError e)
-        {
-          bufferSize /= 2;
+          try
+          {
+            final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
+            final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
+            logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
+                DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
+            return pool;
+          }
+          catch (OutOfMemoryError e)
+          {
+            bufferSize /= 2;
+          }
         }
       }
-      while (bufferSize > MIN_BUFFER_SIZE);
 
-      // Off-line mode or direct memory allocation failed.
+      // Off-heap memory allocation has failed or is disabled.
       final long availableMemory = calculateAvailableHeapMemoryForBuffers();
       logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
-      long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
+      final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
       if (availableMemory < minimumRequiredMemory)
       {
         // Not enough memory.
         throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
       }
-      bufferSize = (int) (availableMemory / nbBuffers);
-      if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
-      {
-        bufferSize = MAX_BUFFER_SIZE;
-      }
+      final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
+      final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
       logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
       return new BufferPool(nbBuffers, bufferSize, false);
     }
@@ -477,7 +479,6 @@
       throw new InitializationException(ERR_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(name));
     }
 
-
     private static Collection<String> getIndexNames(IndexType indexType, AttributeIndex attrIndex)
     {
       if (indexType.equals(IndexType.PRESENCE))
@@ -521,35 +522,41 @@
      */
     private long calculateAvailableHeapMemoryForBuffers()
     {
-      final long totalAvailableMemory;
-      if (DirectoryServer.isRunning())
-      {
-        // Online import/rebuild.
-        final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
-        totalAvailableMemory = Math.max(availableMemory, 16 * MB);
-      }
-      else
-      {
-        // Offline import/rebuild.
-        // call twice gc to ensure finalizers are called
-        // and young to old gen references are properly gc'd
-        Runtime.getRuntime().gc();
-        Runtime.getRuntime().gc();
-
-        totalAvailableMemory = Platform.getUsableMemoryForCaching();
-      }
-
-      // Now take into account various fudge factors.
-      int importMemPct = 90;
+      final long totalAvailableMemory = getAvailableMemoryAfterGC();
+      int importMemPct = 100;
       if (totalAvailableMemory <= SMALL_HEAP_SIZE)
       {
         // Be pessimistic when memory is low.
-        importMemPct -= 25;
+        importMemPct -= 65;
       }
       return (totalAvailableMemory * importMemPct / 100);
     }
   }
 
+  private static long getAvailableMemoryAfterGC()
+  {
+    final Runtime runTime = Runtime.getRuntime();
+    runTime.gc();
+    runTime.gc();
+
+    // First try calculation based on oldgen size
+    final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
+    for (final MemoryPoolMXBean mpool : mpools)
+    {
+      final MemoryUsage usage = mpool.getUsage();
+      if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+      {
+        final long max = usage.getMax();
+        final long hardLimit = (long) (max * 0.90);
+        final long softLimit = (long) (max * 0.69);
+        final long used = usage.getUsed();
+        return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
+      }
+    }
+    // Fall back to 40% of overall heap size (no need to do gc() again).
+    return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
+  }
+
   /** Source of LDAP {@link Entry}s to process. */
   private interface Source
   {
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
index 8f6d878..ce0c8a2 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
@@ -25,14 +25,10 @@
 import java.security.cert.Certificate;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
-import java.util.List;
 
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.management.MemoryUsage;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 
@@ -411,65 +407,6 @@
     {
       return cert.getSubjectDN().equals(cert.getIssuerDN());
     }
-
-
-
-    private long getUsableMemoryForCaching()
-    {
-      long youngGenSize = 0;
-      long oldGenSize = 0;
-
-      List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
-      for (MemoryPoolMXBean mpool : mpools)
-      {
-        MemoryUsage usage = mpool.getUsage();
-        if (usage != null)
-        {
-          String name = mpool.getName();
-          if (name.equalsIgnoreCase("PS Eden Space"))
-          {
-            // Parallel.
-            youngGenSize = usage.getMax();
-          }
-          else if (name.equalsIgnoreCase("PS Old Gen"))
-          {
-            // Parallel.
-            oldGenSize = usage.getMax();
-          }
-          else if (name.equalsIgnoreCase("Par Eden Space"))
-          {
-            // CMS.
-            youngGenSize = usage.getMax();
-          }
-          else if (name.equalsIgnoreCase("CMS Old Gen"))
-          {
-            // CMS.
-            oldGenSize = usage.getMax();
-          }
-        }
-      }
-
-      if (youngGenSize > 0 && oldGenSize > youngGenSize)
-      {
-        // We can calculate available memory based on GC info.
-        return oldGenSize - youngGenSize;
-      }
-      else if (oldGenSize > 0)
-      {
-        // Small old gen. It is going to be difficult to avoid full GCs if the
-        // young gen is bigger.
-        return oldGenSize * 40 / 100;
-      }
-      else
-      {
-        // Unknown GC (G1, JRocket, etc).
-        Runtime runTime = Runtime.getRuntime();
-        runTime.gc();
-        runTime.gc();
-        return (runTime.freeMemory() + (runTime.maxMemory() - runTime
-            .totalMemory())) * 40 / 100;
-      }
-    }
   }
 
 
@@ -587,34 +524,6 @@
     return javaVendor.startsWith(vendor);
   }
 
-
-
-  /**
-   * Calculates the usable memory which could potentially be used by the
-   * application for caching objects. This method <b>does not</b> look at the
-   * amount of free memory, but instead tries to query the JVM's GC settings in
-   * order to determine the amount of usable memory in the old generation (or
-   * equivalent). More specifically, applications may also need to take into
-   * account the amount of memory already in use, for example by performing the
-   * following:
-   *
-   * <pre>
-   * Runtime runTime = Runtime.getRuntime();
-   * runTime.gc();
-   * runTime.gc();
-   * long freeCommittedMemory = runTime.freeMemory();
-   * long uncommittedMemory = runTime.maxMemory() - runTime.totalMemory();
-   * long freeMemory = freeCommittedMemory + uncommittedMemory;
-   * </pre>
-   *
-   * @return The usable memory which could potentially be used by the
-   *         application for caching objects.
-   */
-  public static long getUsableMemoryForCaching()
-  {
-    return IMPL.getUsableMemoryForCaching();
-  }
-
   /**
    * Computes the number of replay/worker/cleaner threads based on the number of cpus in the system.
    * Allows for a multiplier to be specified and a minimum value to be returned if not enough processors

--
Gitblit v1.10.0