From acac2a13ebe79697f86272cacd61541955c9d343 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 16 Jun 2016 09:14:13 +0000
Subject: [PATCH] OPENDJ-3123: Reduce memory pressure by limiting the number of thread.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java |  357 +++++++++++++++++------------------------------------------
 1 files changed, 102 insertions(+), 255 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 798ccf4..22a85f6 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -27,7 +27,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -94,7 +93,7 @@
 import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter;
 import org.opends.server.backends.pluggable.DN2ID.TreeVisitor;
 import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
 import org.opends.server.backends.pluggable.spi.Cursor;
 import org.opends.server.backends.pluggable.spi.Importer;
 import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -117,8 +116,7 @@
 import com.forgerock.opendj.util.OperatingSystem;
 import com.forgerock.opendj.util.PackedLong;
 
-// @Checkstyle:ignore
-import sun.misc.Unsafe;
+import net.jcip.annotations.NotThreadSafe;
 
 /**
  * Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is
@@ -170,41 +168,22 @@
     public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
     {
       final int threadCount =
-          importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors()
-              : importConfig.getThreadCount();
+          importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
       final int indexCount = getIndexCount();
 
-      final int nbBuffer = threadCount * indexCount * 2;
-      final int bufferSize;
-      if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
-      {
-        final long offHeapSize = importConfig.getOffHeapSize();
-        bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
-        if (bufferSize < MIN_BUFFER_SIZE)
-        {
-          // Not enough memory.
-          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
-        }
-        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
-      }
-      else
-      {
-        bufferSize = computeBufferSize(nbBuffer);
-        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
-      }
+      final int nbRequiredBuffers = threadCount * indexCount * 2;
       logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
       logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
 
       final long startTime = System.currentTimeMillis();
       final OnDiskMergeImporter importer;
-      final ExecutorService sorter = Executors.newFixedThreadPool(
-          Runtime.getRuntime().availableProcessors(),
-          newThreadFactory(null, SORTER_THREAD_NAME, true));
+      final ExecutorService sorter =
+          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
       final LDIFReaderSource source =
           new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
       final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
       try (final Importer dbStorage = rootContainer.getStorage().startImport();
-           final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
+           final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
       {
         final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
         final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
@@ -241,6 +220,12 @@
           .getEntriesIgnored());
     }
 
+    private static int getDefaultNumberOfThread()
+    {
+      final int nbProcessors = Runtime.getRuntime().availableProcessors();
+      return Math.max(2, DirectoryServer.isRunning() ? nbProcessors / 2 : nbProcessors);
+    }
+
     private int getIndexCount() throws ConfigException
     {
       int indexCount = 2; // dn2id, dn2uri
@@ -307,29 +292,15 @@
         return;
       }
       rootContainer.getStorage().close();
-      final int threadCount = Runtime.getRuntime().availableProcessors();
+      final int threadCount = getDefaultNumberOfThread();
       final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
-      final int bufferSize;
-      if (BufferPool.SUPPORTS_OFF_HEAP)
-      {
-        bufferSize = MAX_BUFFER_SIZE;
-        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, 
-            DB_CACHE_SIZE, (((long) bufferSize) * nbBuffer) / MB, nbBuffer, bufferSize / KB);
-      }
-      else
-      {
-        bufferSize = computeBufferSize(nbBuffer);
-        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
-      }
-
-      final ExecutorService sorter = Executors.newFixedThreadPool(
-          Runtime.getRuntime().availableProcessors(),
-          newThreadFactory(null, SORTER_THREAD_NAME, true));
+      final ExecutorService sorter =
+          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
 
       final OnDiskMergeImporter importer;
       final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
       try (final Importer dbStorage = rootContainer.getStorage().startImport();
-           final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
+           final BufferPool bufferPool = newBufferPool(nbBuffer))
       {
         final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
             rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
@@ -349,6 +320,44 @@
       logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
     }
 
+    public BufferPool newBufferPool(int nbBuffers) throws InitializationException
+    {
+      // Try off-heap direct buffer
+      int bufferSize = MAX_BUFFER_SIZE;
+      do
+      {
+        try
+        {
+          final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
+          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
+              DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
+          return pool;
+        }
+        catch (OutOfMemoryError e)
+        {
+          bufferSize /= 2;
+        }
+      }
+      while (bufferSize > MIN_BUFFER_SIZE);
+
+      // Off-line mode or direct memory allocation failed.
+      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
+      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
+      long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
+      if (availableMemory < minimumRequiredMemory)
+      {
+        // Not enough memory.
+        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
+      }
+      bufferSize = (int) (availableMemory / nbBuffers);
+      if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
+      {
+        bufferSize = MAX_BUFFER_SIZE;
+      }
+      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
+      return new BufferPool(nbBuffers, bufferSize, false);
+    }
+
     private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig,
         long totalEntries) throws InitializationException
     {
@@ -489,20 +498,6 @@
       return tempDir;
     }
 
-    private int computeBufferSize(int nbBuffer) throws InitializationException
-    {
-      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
-      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
-
-      final long minimumRequiredMemory = nbBuffer * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
-      if (availableMemory < minimumRequiredMemory)
-      {
-        // Not enough memory.
-        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
-      }
-      return Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE);
-    }
-
     /**
      * Calculates the amount of available memory which can be used by this import, taking into account whether
      * the import is running offline or online as a task.
@@ -519,6 +514,11 @@
       else
       {
         // Offline import/rebuild.
+        // call twice gc to ensure finalizers are called
+        // and young to old gen references are properly gc'd
+        Runtime.getRuntime().gc();
+        Runtime.getRuntime().gc();
+
         totalAvailableMemory = Platform.getUsableMemoryForCaching();
       }
 
@@ -823,7 +823,7 @@
   }
 
   /** Max size of phase one buffer. */
-  private static final int MAX_BUFFER_SIZE = 2 * MB;
+  private static final int MAX_BUFFER_SIZE = 4 * MB;
   /** Min size of phase one buffer. */
   private static final int MIN_BUFFER_SIZE = 4 * KB;
   /** DB cache size to use during import. */
@@ -1621,7 +1621,7 @@
     {
       private final String metricName;
       private final BufferPool bufferPool;
-      private final Buffer buffer;
+      private final MemoryBuffer buffer;
       private long totalBytes;
       private int indexPos;
       private int dataPos;
@@ -2727,67 +2727,26 @@
     }
   }
 
-  /** Buffer used by {@link InMemorySortedChunk} to store and sort data. */
-  interface Buffer extends Closeable
-  {
-    void writeInt(int position, int value);
-
-    int readInt(int position);
-
-    ByteString readByteString(int position, int length);
-
-    void writeByteSequence(int position, ByteSequence data);
-
-    int length();
-
-    int compare(int offsetA, int lengthA, int offsetB, int lengthB);
-  }
-
   /**
    * Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory
    * consumption and prevents the significant object allocation cost occurring for huge objects.
    */
   static final class BufferPool implements Closeable
   {
-    private static final Object UNSAFE_OBJECT;
-    static final boolean SUPPORTS_OFF_HEAP;
-    static
-    {
-      Object unsafeObject = null;
-      try
-      {
-        final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
-        final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
-        theUnsafeField.setAccessible(true);
-        unsafeObject = theUnsafeField.get(null);
-      }
-      catch (Throwable e)
-      {
-        // Unsupported.
-      }
-      UNSAFE_OBJECT = unsafeObject;
-      SUPPORTS_OFF_HEAP = UNSAFE_OBJECT != null;
-    }
+    private final BlockingQueue<MemoryBuffer> pool;
 
-    private final BlockingQueue<Buffer> pool;
-    private final int bufferSize;
-
-    BufferPool(int nbBuffer, int bufferSize)
+    BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
     {
       this.pool = new ArrayBlockingQueue<>(nbBuffer);
-      this.bufferSize = bufferSize;
       for (int i = 0; i < nbBuffer; i++)
       {
-        pool.offer(SUPPORTS_OFF_HEAP ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize));
+        pool.offer(new MemoryBuffer(allocateDirect
+                        ? ByteBuffer.allocateDirect(bufferSize)
+                        : ByteBuffer.allocate(bufferSize)));
       }
     }
 
-    public int getBufferSize()
-    {
-      return bufferSize;
-    }
-
-    private Buffer get()
+    private MemoryBuffer get()
     {
       try
       {
@@ -2799,7 +2758,7 @@
       }
     }
 
-    private void release(Buffer buffer)
+    private void release(MemoryBuffer buffer)
     {
       try
       {
@@ -2811,183 +2770,71 @@
       }
     }
 
-    public void setSize(int size)
-    {
-      while (pool.size() > size)
-      {
-        get();
-      }
-    }
-
     @Override
     public void close()
     {
-      Buffer buffer;
-      while ((buffer = pool.poll()) != null)
-      {
-        closeSilently(buffer);
-      }
+      pool.clear();
     }
 
-    /** Off-heap buffer using Unsafe memory access. */
-    @SuppressWarnings("restriction")
-    static final class OffHeapBuffer implements Buffer
-    {
-      private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
-      private static final long BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
-      private final long address;
-      private final int size;
-      private int position;
-      private final OutputStream asOutputStream = new OutputStream()
-      {
-        @Override
-        public void write(int value) throws IOException
-        {
-          UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
-        }
-
-        @Override
-        public void write(byte[] b) throws IOException {
-            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
-            position += b.length;
-        }
-
-        @Override
-        public void write(byte[] b, int off, int len) throws IOException {
-            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
-            position += b.length;
-        }
-      };
-      private boolean closed;
-
-      OffHeapBuffer(int size)
-      {
-        this.size = size;
-        this.address = UNSAFE.allocateMemory(size);
-      }
-
-      @Override
-      public void writeInt(final int position, final int value)
-      {
-        UNSAFE.putInt(address + position, value);
-      }
-
-      @Override
-      public int readInt(final int position)
-      {
-        return UNSAFE.getInt(address + position);
-      }
-
-      @Override
-      public void writeByteSequence(final int position, ByteSequence data)
-      {
-        Reject.ifFalse(position + data.length() <= size);
-        this.position = position;
-        try
-        {
-          data.copyTo(asOutputStream);
-        }
-        catch(IOException e)
-        {
-          throw new StorageRuntimeException(e);
-        }
-      }
-
-      @Override
-      public int length()
-      {
-        return size;
-      }
-
-      @Override
-      public ByteString readByteString(int position, int length)
-      {
-        Reject.ifFalse(position + length <= size);
-
-        final byte[] data = new byte[length];
-        UNSAFE.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length);
-        return ByteString.wrap(data);
-      }
-
-      @Override
-      public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
-      {
-        final int len = Math.min(lengthA, lengthB);
-        for(int i = 0 ; i < len ; i++)
-        {
-          final int a = UNSAFE.getByte(address + offsetA + i) & 0xFF;
-          final int b = UNSAFE.getByte(address + offsetB + i) & 0xFF;
-          if ( a != b )
-          {
-            return a - b;
-          }
-        }
-        return lengthA - lengthB;
-      }
-
-      @Override
-      public void close()
-      {
-        if (!closed)
-        {
-          UNSAFE.freeMemory(address);
-        }
-        closed = true;
-      }
-    }
-
-    /** Heap buffer using ByteBuffer. */
-    static final class HeapBuffer implements Buffer
+    /** Buffer wrapping a ByteBuffer. */
+    @NotThreadSafe
+    static final class MemoryBuffer
     {
       private final ByteBuffer buffer;
 
-      HeapBuffer(int size)
+      MemoryBuffer(final ByteBuffer byteBuffer)
       {
-        this.buffer = ByteBuffer.allocate(size);
+        this.buffer = byteBuffer;
       }
 
-      @Override
-      public void writeInt(final int position, final int value)
+      void writeInt(final int position, final int value)
       {
         buffer.putInt(position, value);
       }
 
-      @Override
-      public int readInt(final int position)
+      int readInt(final int position)
       {
         return buffer.getInt(position);
       }
 
-      @Override
-      public void writeByteSequence(int position, ByteSequence data)
+      void writeByteSequence(int position, ByteSequence data)
       {
         buffer.position(position);
         data.copyTo(buffer);
       }
 
-      @Override
-      public int length()
+      int length()
       {
         return buffer.capacity();
       }
 
-      @Override
-      public ByteString readByteString(int position, int length)
+      ByteString readByteString(int position, int length)
       {
-        return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
+        if (buffer.hasArray())
+        {
+          return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
+        }
+        final byte[] data = new byte[length];
+        buffer.position(position);
+        buffer.get(data);
+        return ByteString.wrap(data);
       }
 
-      @Override
-      public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
+      int compare(int offsetA, int lengthA, int offsetB, int lengthB)
       {
-        return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB));
-      }
-
-      @Override
-      public void close()
-      {
-        // Nothing to do
+        int count = Math.min(lengthA, lengthB);
+        int i = offsetA;
+        int j = offsetB;
+        while (count-- != 0)
+        {
+            final int firstByte = 0xFF & buffer.get(i++);
+            final int secondByte = 0xFF & buffer.get(j++);
+            if (firstByte != secondByte)
+            {
+                return firstByte - secondByte;
+            }
+        }
+        return lengthA - lengthB;
       }
     }
   }

--
Gitblit v1.10.0