From 134f26626acf0f76f0e999875f3c5a90c7b16034 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 30 Jun 2016 08:47:04 +0000
Subject: [PATCH] OPENDJ-3171: rebuild-index/importer are killed on low-memory configuration.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java |  103 +++++++++++++++++++++++++++------------------------
 1 files changed, 55 insertions(+), 48 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index a80c6d9..7d1ec5d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -42,6 +42,9 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
@@ -121,13 +124,11 @@
 import org.opends.server.backends.pluggable.spi.WriteOperation;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.ServerContext;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.LDIFImportResult;
-import org.opends.server.util.Platform;
 
 import com.forgerock.opendj.util.OperatingSystem;
 import com.forgerock.opendj.util.PackedLong;
@@ -169,13 +170,11 @@
     /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
     private static final int SMALL_HEAP_SIZE = 256 * MB;
 
-    private final ServerContext serverContext;
     private final RootContainer rootContainer;
     private final PluggableBackendCfg backendCfg;
 
-    StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
+    StrategyImpl(RootContainer rootContainer, PluggableBackendCfg backendCfg)
     {
-      this.serverContext = serverContext;
       this.rootContainer = rootContainer;
       this.backendCfg = backendCfg;
     }
@@ -337,40 +336,43 @@
       logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
     }
 
-    public BufferPool newBufferPool(int nbBuffers) throws InitializationException
+    public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
     {
-      // Try off-heap direct buffer
-      int bufferSize = MAX_BUFFER_SIZE;
-      do
+      final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
+      final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
+      if (offHeapMemoryAvailable > 0)
       {
-        try
+        // Try off-heap direct buffer
+        logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
+        int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
+        while (bufferSize > MIN_BUFFER_SIZE)
         {
-          final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
-          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
-              DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
-          return pool;
-        }
-        catch (OutOfMemoryError e)
-        {
-          bufferSize /= 2;
+          try
+          {
+            final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
+            final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
+            logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
+                DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
+            return pool;
+          }
+          catch (OutOfMemoryError e)
+          {
+            bufferSize /= 2;
+          }
         }
       }
-      while (bufferSize > MIN_BUFFER_SIZE);
 
-      // Off-line mode or direct memory allocation failed.
+      // Off-heap memory allocation has failed or is disabled.
       final long availableMemory = calculateAvailableHeapMemoryForBuffers();
       logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
-      long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
+      final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
       if (availableMemory < minimumRequiredMemory)
       {
         // Not enough memory.
         throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
       }
-      bufferSize = (int) (availableMemory / nbBuffers);
-      if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
-      {
-        bufferSize = MAX_BUFFER_SIZE;
-      }
+      final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
+      final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
       logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
       return new BufferPool(nbBuffers, bufferSize, false);
     }
@@ -477,7 +479,6 @@
       throw new InitializationException(ERR_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(name));
     }
 
-
     private static Collection<String> getIndexNames(IndexType indexType, AttributeIndex attrIndex)
     {
       if (indexType.equals(IndexType.PRESENCE))
@@ -521,35 +522,41 @@
      */
     private long calculateAvailableHeapMemoryForBuffers()
     {
-      final long totalAvailableMemory;
-      if (DirectoryServer.isRunning())
-      {
-        // Online import/rebuild.
-        final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
-        totalAvailableMemory = Math.max(availableMemory, 16 * MB);
-      }
-      else
-      {
-        // Offline import/rebuild.
-        // call twice gc to ensure finalizers are called
-        // and young to old gen references are properly gc'd
-        Runtime.getRuntime().gc();
-        Runtime.getRuntime().gc();
-
-        totalAvailableMemory = Platform.getUsableMemoryForCaching();
-      }
-
-      // Now take into account various fudge factors.
-      int importMemPct = 90;
+      final long totalAvailableMemory = getAvailableMemoryAfterGC();
+      int importMemPct = 100;
       if (totalAvailableMemory <= SMALL_HEAP_SIZE)
       {
         // Be pessimistic when memory is low.
-        importMemPct -= 25;
+        importMemPct -= 65;
       }
       return (totalAvailableMemory * importMemPct / 100);
     }
   }
 
+  private static long getAvailableMemoryAfterGC()
+  {
+    final Runtime runTime = Runtime.getRuntime();
+    runTime.gc();
+    runTime.gc();
+
+    // First try calculation based on oldgen size
+    final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
+    for (final MemoryPoolMXBean mpool : mpools)
+    {
+      final MemoryUsage usage = mpool.getUsage();
+      if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+      {
+        final long max = usage.getMax();
+        final long hardLimit = (long) (max * 0.90);
+        final long softLimit = (long) (max * 0.69);
+        final long used = usage.getUsed();
+        return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
+      }
+    }
+    // Fall back to 40% of overall heap size (no need to do gc() again).
+    return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
+  }
+
   /** Source of LDAP {@link Entry}s to process. */
   private interface Source
   {

--
Gitblit v1.10.0