From 997c10c4c5946d76f99f1f2ab9d0786ead1e545c Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 30 Jun 2016 12:06:48 +0000
Subject: [PATCH] OPENDJ-3171: rebuild-index/importer are killed on low-memory configuration.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java |  294 ++++++++++++++++++++++++++++++++++------------------------
 1 files changed, 171 insertions(+), 123 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 7d1ec5d..6912373 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -182,58 +182,60 @@
     @Override
     public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
     {
-      final int threadCount =
-          importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
-      final int indexCount = getIndexCount();
-
-      final int nbRequiredBuffers = threadCount * indexCount * 2;
       logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
-      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
 
       final long startTime = System.currentTimeMillis();
-      final OnDiskMergeImporter importer;
-      final ExecutorService sorter =
-          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
-      final LDIFReaderSource source =
-          new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
-      final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
-      try (final Importer dbStorage = rootContainer.getStorage().startImport();
-           final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
+      final int maxThreadCount = importConfig.getThreadCount() == 0
+          ? getDefaultNumberOfThread()
+          : importConfig.getThreadCount();
+      final int nbBuffersPerThread = 2 * getIndexCount();
+      try(final BufferPool bufferPool = newBufferPool(maxThreadCount, nbBuffersPerThread)) 
       {
-        final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
-        final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
-            ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
-            : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
-
-        importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
-        importer.doImport(source);
-      }
-      finally
-      {
-        sorter.shutdownNow();
-        if (OperatingSystem.isWindows())
+        final int threadCount = bufferPool.size() / nbBuffersPerThread;
+        logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
+        final OnDiskMergeImporter importer;
+        final ExecutorService sorter =
+            Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
+        final LDIFReaderSource source =
+            new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
+        final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
+        try (final Importer dbStorage = rootContainer.getStorage().startImport())
         {
-          // Try to force the JVM to close mmap()ed file so that they can be deleted.
-          // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
-          System.gc();
-          Runtime.getRuntime().runFinalization();
+          final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
+          final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
+              ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
+              : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
+
+          importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
+          importer.doImport(source);
         }
-        recursiveDelete(tempDir);
-      }
-      logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
-          / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
+        finally
+        {
+          sorter.shutdownNow();
+          if (OperatingSystem.isWindows())
+          {
+            // Try to force the JVM to close mmap()ed file so that they can be deleted.
+            // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+            System.gc();
+            Runtime.getRuntime().runFinalization();
+          }
+          recursiveDelete(tempDir);
+        }
+        logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
+            / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
 
-      final long importTime = System.currentTimeMillis() - startTime;
-      float rate = 0;
-      if (importTime > 0)
-      {
-        rate = 1000f * source.getEntriesRead() / importTime;
-      }
-      logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
-          .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
+        final long importTime = System.currentTimeMillis() - startTime;
+        float rate = 0;
+        if (importTime > 0)
+        {
+          rate = 1000f * source.getEntriesRead() / importTime;
+        }
+        logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
+            .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
 
-      return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
-          .getEntriesIgnored());
+        return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
+            .getEntriesIgnored());
+      }
     }
 
     private static int getDefaultNumberOfThread()
@@ -308,70 +310,118 @@
         return;
       }
       rootContainer.getStorage().close();
-      final int threadCount = getDefaultNumberOfThread();
-      final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
-      final ExecutorService sorter =
-          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
-
-      final OnDiskMergeImporter importer;
-      final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
-      try (final Importer dbStorage = rootContainer.getStorage().startImport();
-           final BufferPool bufferPool = newBufferPool(nbBuffer))
+      final int nbBuffersPerThread = 2 * indexesToRebuild.size();
+      try (final BufferPool bufferPool = newBufferPool(getDefaultNumberOfThread(), nbBuffersPerThread))
       {
-        final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
-            rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
+        final int threadCount = bufferPool.size() / nbBuffersPerThread;
+        final ExecutorService sorter =
+            Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
 
-        importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
-        importer.doImport(
-            new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries));
-      }
-      finally
-      {
-        sorter.shutdown();
-        recursiveDelete(tempDir);
-      }
+        final OnDiskMergeImporter importer;
+        final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
+        try (final Importer dbStorage = rootContainer.getStorage().startImport())
+        {
+          final AbstractTwoPhaseImportStrategy strategy =
+              new RebuildIndexStrategy(
+                  rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
 
-      final long totalTime = importer.getTotalTimeInMillis();
-      final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
-      logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
+          importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
+          importer.doImport(new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount,
+              totalEntries));
+        }
+        finally
+        {
+          sorter.shutdown();
+          recursiveDelete(tempDir);
+        }
+        final long totalTime = importer.getTotalTimeInMillis();
+        final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
+        logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
+      }
     }
 
-    public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
+    /**
+     * Try to allocate a {@link BufferPool} with a number of buffer in it being a multiple of {@code nbBuffers} in the
+     * range [1, {@code maxThreadCount}] depending of the amount of memory available.
+     */
+    BufferPool newBufferPool(final int maxThreadCount, final int nbBuffers) throws InitializationException
     {
+      final int initialThreadCount = maxThreadCount;
       final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
-      final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
-      if (offHeapMemoryAvailable > 0)
+      boolean useOffHeap = (offheapMemorySize != null && offheapMemorySize > 0);
+      long memoryAvailable =
+          useOffHeap ? offheapMemorySize.longValue() : calculateAvailableHeapMemoryForBuffersAfterGC();
+      int threadCount = initialThreadCount;
+      for (;;)
       {
-        // Try off-heap direct buffer
-        logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
-        int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
-        while (bufferSize > MIN_BUFFER_SIZE)
+        final int nbRequiredBuffers = threadCount * nbBuffers;
+        try
         {
-          try
+          return useOffHeap
+              ? newOffHeapBufferPool(nbRequiredBuffers, memoryAvailable)
+              : newHeapBufferPool(nbRequiredBuffers, memoryAvailable);
+        }
+        catch (InitializationException e)
+        {
+          if (threadCount > 1)
           {
-            final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
-            final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
-            logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
-                DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
-            return pool;
+            // Retry using less buffer by reducing the number of importer threads.
+            threadCount--;
           }
-          catch (OutOfMemoryError e)
+          else if (useOffHeap)
           {
-            bufferSize /= 2;
+            // Retry using on-heap buffer
+            useOffHeap = false;
+            memoryAvailable = calculateAvailableHeapMemoryForBuffersAfterGC();
+            threadCount = initialThreadCount;
+          }
+          else
+          {
+            throw e;
           }
         }
       }
+    }
 
-      // Off-heap memory allocation has failed or is disabled.
-      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
-      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
+    private BufferPool newOffHeapBufferPool(final int nbBuffers, long offHeapMemoryAvailable)
+        throws InitializationException
+    {
+      // Try off-heap direct buffer
+      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
+      int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
+      while (bufferSize > MIN_BUFFER_SIZE)
+      {
+        BufferPool pool = null;
+        try
+        {
+          pool = new BufferPool(nbBuffers, bufferSize, true);
+          final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
+          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers,
+              bufferSize / KB);
+          return pool;
+        }
+        catch (OutOfMemoryError e)
+        {
+          bufferSize /= 2;
+          closeSilently(pool);
+          pool = null;
+        }
+      }
+      throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapMemoryAvailable, nbBuffers
+          * MIN_BUFFER_SIZE));
+    }
+
+    private BufferPool newHeapBufferPool(final int nbBuffers, final long heapMemoryAvailable)
+        throws InitializationException
+    {
       final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
-      if (availableMemory < minimumRequiredMemory)
+      if (heapMemoryAvailable < minimumRequiredMemory)
       {
         // Not enough memory.
-        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
+        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(heapMemoryAvailable, minimumRequiredMemory));
       }
-      final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
+      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, heapMemoryAvailable, nbBuffers);
+      final long buffersMemory = heapMemoryAvailable - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
       final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
       logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
       return new BufferPool(nbBuffers, bufferSize, false);
@@ -520,41 +570,26 @@
      * Calculates the amount of available memory which can be used by this import, taking into account whether
      * the import is running offline or online as a task.
      */
-    private long calculateAvailableHeapMemoryForBuffers()
+    private long calculateAvailableHeapMemoryForBuffersAfterGC()
     {
-      final long totalAvailableMemory = getAvailableMemoryAfterGC();
-      int importMemPct = 100;
-      if (totalAvailableMemory <= SMALL_HEAP_SIZE)
-      {
-        // Be pessimistic when memory is low.
-        importMemPct -= 65;
-      }
-      return (totalAvailableMemory * importMemPct / 100);
-    }
-  }
+      final Runtime runTime = Runtime.getRuntime();
+      runTime.gc();
+      runTime.gc();
 
-  private static long getAvailableMemoryAfterGC()
-  {
-    final Runtime runTime = Runtime.getRuntime();
-    runTime.gc();
-    runTime.gc();
-
-    // First try calculation based on oldgen size
-    final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
-    for (final MemoryPoolMXBean mpool : mpools)
-    {
-      final MemoryUsage usage = mpool.getUsage();
-      if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+      // First try calculation based on oldgen size
+      final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
+      for (final MemoryPoolMXBean mpool : mpools)
       {
-        final long max = usage.getMax();
-        final long hardLimit = (long) (max * 0.90);
-        final long softLimit = (long) (max * 0.69);
-        final long used = usage.getUsed();
-        return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
+        final MemoryUsage usage = mpool.getUsage();
+        if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
+        {
+          final long max = usage.getMax();
+          return (max > SMALL_HEAP_SIZE ? (max * 90 / 100) : (max * 70 / 100));
+        }
       }
+      // Fall back to 40% of overall heap size (no need to do gc() again).
+      return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
     }
-    // Fall back to 40% of overall heap size (no need to do gc() again).
-    return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
   }
 
   /** Source of LDAP {@link Entry}s to process. */
@@ -849,7 +884,7 @@
   /** Max size of phase one buffer. */
   private static final int MAX_BUFFER_SIZE = 4 * MB;
   /** Min size of phase one buffer. */
-  private static final int MIN_BUFFER_SIZE = 4 * KB;
+  private static final int MIN_BUFFER_SIZE = 32 * KB;
   /** DB cache size to use during import. */
   private static final int DB_CACHE_SIZE = 32 * MB;
   /** Required free memory for this importer. */
@@ -2771,12 +2806,25 @@
     BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
     {
       this.pool = new ArrayBlockingQueue<>(nbBuffer);
-      for (int i = 0; i < nbBuffer; i++)
+      try
       {
-        pool.offer(new MemoryBuffer(allocateDirect
-                        ? ByteBuffer.allocateDirect(bufferSize)
-                        : ByteBuffer.allocate(bufferSize)));
+        for (int i = 0; i < nbBuffer; i++)
+        {
+          pool.offer(new MemoryBuffer(allocateDirect
+                          ? ByteBuffer.allocateDirect(bufferSize)
+                          : ByteBuffer.allocate(bufferSize)));
+        }
       }
+      catch(OutOfMemoryError e)
+      {
+        close();
+        throw e;
+      }
+    }
+
+    private int size()
+    {
+      return pool.size();
     }
 
     private MemoryBuffer get()

--
Gitblit v1.10.0