From 571c3b4cb6142f4ef5eb41c3b8cf50dd9d6c9c81 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Tue, 12 Apr 2016 14:43:12 +0000
Subject: [PATCH] OPENDJ-2631: OOME error while importing 100M entries.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java |  211 +++++++++++++++++++++++++---------------------------
 1 files changed, 102 insertions(+), 109 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 40023b0..2d08f3d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -16,12 +16,11 @@
 package org.opends.server.backends.pluggable;
 
 import static java.nio.channels.FileChannel.*;
-
+import static java.nio.file.StandardOpenOption.*;
 import static org.forgerock.util.Utils.*;
 import static org.opends.messages.BackendMessages.*;
 import static org.opends.server.util.DynamicConstants.*;
 import static org.opends.server.util.StaticUtils.*;
-import static java.nio.file.StandardOpenOption.*;
 
 import java.io.Closeable;
 import java.io.File;
@@ -80,6 +79,7 @@
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.spi.Indexer;
 import org.forgerock.util.Reject;
@@ -107,7 +107,6 @@
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ServerContext;
-import org.forgerock.opendj.ldap.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.InitializationException;
@@ -817,8 +816,6 @@
     }
   }
 
-  /** Default size for the off-heap memory dedicated to the phase one's buffer. */
-  private static final int DEFAULT_OFFHEAP_SIZE = 700;
   /** Max size of phase one buffer. */
   private static final int MAX_BUFFER_SIZE = 2 * MB;
   /** Min size of phase one buffer. */
@@ -1401,7 +1398,7 @@
    * Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
    * unlimited amount of data. This class uses double-buffering: data are firstly stored in a
    * {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
-   * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
+   * {@link FileRegion}. Duplicate keys are reduced by a {@link Collector}.
    * {@link #put(ByteSequence, ByteSequence))} is thread-safe.
    * This class is used in phase-one. There is one {@link ExternalSortChunk} per
    * database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
@@ -1420,7 +1417,7 @@
     private final Collector<?, ByteString> phaseOneDeduplicator;
     private final Collector<?, ByteString> phaseTwoDeduplicator;
     /** Keep track of pending sorting tasks. */
-    private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
+    private final CompletionService<Region> sorter;
     /** Keep track of currently opened chunks. */
     private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>());
     /** Keep track of the number of chunks created. */
@@ -1473,36 +1470,57 @@
       {
         sortAndAppendChunkAsync(chunk);
       }
+
+      final List<MeteredCursor<ByteString, ByteString>> cursors = new ArrayList<>();
       try
       {
-        return new CollectorCursor<>(
-            new CompositeCursor<ByteString, ByteString>(name,
-                waitTasksTermination(sorter, nbSortedChunks.get()))
-            {
-              public void close()
-              {
-                super.close();
-                if (OperatingSystem.isWindows())
-                {
-                  // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
-                  // To prevent these not deleted files to waste space, we empty it.
-                  try
-                  {
-                    channel.truncate(0);
-                  }
-                  catch (IOException e)
-                  {
-                    // This is best effort, it's safe to ignore the exception here.
-                  }
-                }
-                closeSilently(channel);
-              }
-            }, (Collector<?, ByteString>) phaseTwoDeduplicator);
+        final List<Region> regions = waitTasksTermination(sorter, nbSortedChunks.get());
+        Collections.sort(regions); // Sort regions by their starting offsets.
+        long mmapPosition = 0;
+        // Create as big as possible memory are (handling 2Gb limit) and create as many cursors as regions from
+        // these area.
+        MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get(), Integer.MAX_VALUE));
+        for (Region region : regions)
+        {
+          if ((region.offset + region.size) > (mmapPosition + Integer.MAX_VALUE))
+          {
+            // Handle the 2Gb ByteBuffer limit
+            mmapPosition = region.offset;
+            mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get() - mmapPosition, Integer.MAX_VALUE));
+          }
+          final ByteBuffer regionBuffer = mmap.duplicate();
+          final int relativeRegionOffset = (int) (region.offset - mmapPosition);
+          regionBuffer.position(relativeRegionOffset).limit(regionBuffer.position() + region.size);
+          cursors.add(new FileRegion.Cursor(name, regionBuffer.slice()));
+        }
       }
-      catch (ExecutionException | InterruptedException e)
+      catch (ExecutionException | InterruptedException | IOException e)
       {
         throw new StorageRuntimeException(e);
       }
+
+      return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
+      {
+        @Override
+        public void close()
+        {
+          super.close();
+          if (OperatingSystem.isWindows())
+          {
+            // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+            // To prevent these not deleted files to waste space, we empty it.
+            try
+            {
+              channel.truncate(0);
+            }
+            catch (IOException e)
+            {
+              // This is best effort, it's safe to ignore the exception here.
+            }
+          }
+          closeSilently(channel);
+        }
+      }, (Collector<?, ByteString>) phaseTwoDeduplicator);
     }
 
     @Override
@@ -1528,26 +1546,46 @@
       final long startOffset = filePosition.getAndAdd(chunk.size());
       nbSortedChunks.incrementAndGet();
 
-      sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>()
+      sorter.submit(new Callable<Region>()
       {
         @Override
-        public MeteredCursor<ByteString, ByteString> call() throws Exception
+        public Region call() throws Exception
         {
           /*
-           * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
-           * de-duplication performed by the CollectorCursor.
+           * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
+           * performed by the CollectorCursor.
            */
-          final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
+          final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
+          final int regionSize;
           try (final SequentialCursor<ByteString, ByteString> source =
               new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
           {
-            copyIntoChunk(source, persistentChunk);
+            regionSize = region.write(source);
           }
-          return persistentChunk.flip();
+          return new Region(startOffset, regionSize);
         }
       });
     }
 
+    /** Define a region inside a file. */
+    private static final class Region implements Comparable<Region>
+    {
+      private final long offset;
+      private final int size;
+
+      Region(long offset, int size)
+      {
+        this.offset = offset;
+        this.size = size;
+      }
+
+      @Override
+      public int compareTo(Region o)
+      {
+        return Long.compare(offset, o.offset);
+      }
+    }
+
     /**
      * Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
      * they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
@@ -1780,13 +1818,9 @@
      * +------------+--------------+--------------+----------------+
      * </pre>
      */
-    static final class FileRegionChunk implements Chunk
+    static final class FileRegion
     {
-      private final String metricName;
-      private final FileChannel channel;
-      private final long startOffset;
-      private long size;
-      private MappedByteBuffer mmapBuffer;
+      private final MappedByteBuffer mmapBuffer;
       private final OutputStream mmapBufferOS = new OutputStream()
       {
         @Override
@@ -1796,65 +1830,33 @@
         }
       };
 
-      FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException
+      FileRegion(FileChannel channel, long startOffset, long size) throws IOException
       {
-        this.metricName = name;
-        this.channel = channel;
-        this.startOffset = startOffset;
         if (size > 0)
         {
           // Make sure that the file is big-enough to encapsulate this memory-mapped region.
           channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1);
         }
-        this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
+        mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
       }
 
-      @Override
-      public boolean put(ByteSequence key, ByteSequence value)
+      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
       {
-        final int recordSize =
-            PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
-                .length();
-        if (mmapBuffer.remaining() < recordSize)
+        while (source.next())
         {
-          // The regions is full
-          return false;
-        }
-
-        try
-        {
+          final ByteSequence key = source.getKey();
+          final ByteSequence value = source.getValue();
           PackedLong.writeCompactUnsigned(mmapBufferOS, key.length());
           PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
+          key.copyTo(mmapBuffer);
+          value.copyTo(mmapBuffer);
         }
-        catch (IOException e)
-        {
-          throw new StorageRuntimeException(e);
-        }
-        key.copyTo(mmapBuffer);
-        value.copyTo(mmapBuffer);
-
-        return true;
-      }
-
-      @Override
-      public long size()
-      {
-        return mmapBuffer == null ? size : mmapBuffer.position();
-      }
-
-      @Override
-      public MeteredCursor<ByteString, ByteString> flip()
-      {
-        size = mmapBuffer.position();
-        mmapBuffer = null;
-        return new FileRegionChunkCursor(startOffset, size);
+        return mmapBuffer.position();
       }
 
       /** Cursor through the specific memory-mapped file's region. */
-      private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
+      static final class Cursor implements MeteredCursor<ByteString, ByteString>
       {
-        private final long regionOffset;
-        private final long regionSize;
         private final InputStream asInputStream = new InputStream()
         {
           @Override
@@ -1863,19 +1865,19 @@
             return region.get() & 0xFF;
           }
         };
+        private final String metricName;
         private ByteBuffer region;
         private ByteString key, value;
 
-        FileRegionChunkCursor(long regionOffset, long regionSize)
+        Cursor(String metricName, ByteBuffer region)
         {
-          this.regionOffset = regionOffset;
-          this.regionSize = regionSize;
+          this.metricName = metricName;
+          this.region = region;
         }
 
         @Override
         public boolean next()
         {
-          ensureRegionIsMemoryMapped();
           if (!region.hasRemaining())
           {
             key = value = null;
@@ -1904,22 +1906,6 @@
           return true;
         }
 
-        private void ensureRegionIsMemoryMapped()
-        {
-          if (region == null)
-          {
-            // Because mmap() regions are a counted and limited resources, we create them lazily.
-            try
-            {
-              region = channel.map(MapMode.READ_ONLY, regionOffset, regionSize);
-            }
-            catch (IOException e)
-            {
-              throw new IllegalStateException(e);
-            }
-          }
-        }
-
         @Override
         public boolean isDefined()
         {
@@ -1962,13 +1948,13 @@
         @Override
         public long getNbBytesRead()
         {
-          return region == null ? 0 : region.position();
+          return region.position();
         }
 
         @Override
         public long getNbBytesTotal()
         {
-          return regionSize;
+          return region.limit();
         }
       }
     }
@@ -2263,7 +2249,10 @@
     long nbRecords = 0;
     while (source.next())
     {
-      destination.put(source.getKey(), source.getValue());
+      if (!destination.put(source.getKey(), source.getValue()))
+      {
+        throw new IllegalStateException("Destination chunk is full");
+      }
       nbRecords++;
     }
     return nbRecords;
@@ -2815,6 +2804,7 @@
     }
 
     /** Off-heap buffer using Unsafe memory access. */
+    @SuppressWarnings("restriction")
     static final class OffHeapBuffer implements Buffer
     {
       private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
@@ -3140,6 +3130,7 @@
   {
     private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>();
 
+    @SuppressWarnings("unchecked")
     static <V> Collector<V, V> getInstance()
     {
       return (Collector<V, V>) INSTANCE;
@@ -3685,6 +3676,8 @@
       // +1 because newly added entry is added before the least recently one is removed.
       this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
       {
+        private static final long serialVersionUID = 1L;
+
         @Override
         protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
         {

--
Gitblit v1.10.0