From 571c3b4cb6142f4ef5eb41c3b8cf50dd9d6c9c81 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Tue, 12 Apr 2016 14:43:12 +0000
Subject: [PATCH] OPENDJ-2631: OOME error while importing 100M entries.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java     |  211 ++++++++++++++++++++---------------------
 opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java |   31 +++---
 2 files changed, 118 insertions(+), 124 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 40023b0..2d08f3d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -16,12 +16,11 @@
 package org.opends.server.backends.pluggable;
 
 import static java.nio.channels.FileChannel.*;
-
+import static java.nio.file.StandardOpenOption.*;
 import static org.forgerock.util.Utils.*;
 import static org.opends.messages.BackendMessages.*;
 import static org.opends.server.util.DynamicConstants.*;
 import static org.opends.server.util.StaticUtils.*;
-import static java.nio.file.StandardOpenOption.*;
 
 import java.io.Closeable;
 import java.io.File;
@@ -80,6 +79,7 @@
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.spi.Indexer;
 import org.forgerock.util.Reject;
@@ -107,7 +107,6 @@
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ServerContext;
-import org.forgerock.opendj.ldap.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.InitializationException;
@@ -817,8 +816,6 @@
     }
   }
 
-  /** Default size for the off-heap memory dedicated to the phase one's buffer. */
-  private static final int DEFAULT_OFFHEAP_SIZE = 700;
   /** Max size of phase one buffer. */
   private static final int MAX_BUFFER_SIZE = 2 * MB;
   /** Min size of phase one buffer. */
@@ -1401,7 +1398,7 @@
    * Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
    * unlimited amount of data. This class uses double-buffering: data are firstly stored in a
    * {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
-   * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
+   * {@link FileRegion}. Duplicate keys are reduced by a {@link Collector}.
    * {@link #put(ByteSequence, ByteSequence))} is thread-safe.
    * This class is used in phase-one. There is one {@link ExternalSortChunk} per
    * database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
@@ -1420,7 +1417,7 @@
     private final Collector<?, ByteString> phaseOneDeduplicator;
     private final Collector<?, ByteString> phaseTwoDeduplicator;
     /** Keep track of pending sorting tasks. */
-    private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
+    private final CompletionService<Region> sorter;
     /** Keep track of currently opened chunks. */
     private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>());
     /** Keep track of the number of chunks created. */
@@ -1473,36 +1470,57 @@
       {
         sortAndAppendChunkAsync(chunk);
       }
+
+      final List<MeteredCursor<ByteString, ByteString>> cursors = new ArrayList<>();
       try
       {
-        return new CollectorCursor<>(
-            new CompositeCursor<ByteString, ByteString>(name,
-                waitTasksTermination(sorter, nbSortedChunks.get()))
-            {
-              public void close()
-              {
-                super.close();
-                if (OperatingSystem.isWindows())
-                {
-                  // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
-                  // To prevent these not deleted files to waste space, we empty it.
-                  try
-                  {
-                    channel.truncate(0);
-                  }
-                  catch (IOException e)
-                  {
-                    // This is best effort, it's safe to ignore the exception here.
-                  }
-                }
-                closeSilently(channel);
-              }
-            }, (Collector<?, ByteString>) phaseTwoDeduplicator);
+        final List<Region> regions = waitTasksTermination(sorter, nbSortedChunks.get());
+        Collections.sort(regions); // Sort regions by their starting offsets.
+        long mmapPosition = 0;
+        // Create as big as possible memory are (handling 2Gb limit) and create as many cursors as regions from
+        // these area.
+        MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get(), Integer.MAX_VALUE));
+        for (Region region : regions)
+        {
+          if ((region.offset + region.size) > (mmapPosition + Integer.MAX_VALUE))
+          {
+            // Handle the 2Gb ByteBuffer limit
+            mmapPosition = region.offset;
+            mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get() - mmapPosition, Integer.MAX_VALUE));
+          }
+          final ByteBuffer regionBuffer = mmap.duplicate();
+          final int relativeRegionOffset = (int) (region.offset - mmapPosition);
+          regionBuffer.position(relativeRegionOffset).limit(regionBuffer.position() + region.size);
+          cursors.add(new FileRegion.Cursor(name, regionBuffer.slice()));
+        }
       }
-      catch (ExecutionException | InterruptedException e)
+      catch (ExecutionException | InterruptedException | IOException e)
       {
         throw new StorageRuntimeException(e);
       }
+
+      return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
+      {
+        @Override
+        public void close()
+        {
+          super.close();
+          if (OperatingSystem.isWindows())
+          {
+            // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+            // To prevent these not deleted files to waste space, we empty it.
+            try
+            {
+              channel.truncate(0);
+            }
+            catch (IOException e)
+            {
+              // This is best effort, it's safe to ignore the exception here.
+            }
+          }
+          closeSilently(channel);
+        }
+      }, (Collector<?, ByteString>) phaseTwoDeduplicator);
     }
 
     @Override
@@ -1528,26 +1546,46 @@
       final long startOffset = filePosition.getAndAdd(chunk.size());
       nbSortedChunks.incrementAndGet();
 
-      sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>()
+      sorter.submit(new Callable<Region>()
       {
         @Override
-        public MeteredCursor<ByteString, ByteString> call() throws Exception
+        public Region call() throws Exception
         {
           /*
-           * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
-           * de-duplication performed by the CollectorCursor.
+           * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
+           * performed by the CollectorCursor.
            */
-          final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
+          final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
+          final int regionSize;
           try (final SequentialCursor<ByteString, ByteString> source =
               new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
           {
-            copyIntoChunk(source, persistentChunk);
+            regionSize = region.write(source);
           }
-          return persistentChunk.flip();
+          return new Region(startOffset, regionSize);
         }
       });
     }
 
+    /** Define a region inside a file. */
+    private static final class Region implements Comparable<Region>
+    {
+      private final long offset;
+      private final int size;
+
+      Region(long offset, int size)
+      {
+        this.offset = offset;
+        this.size = size;
+      }
+
+      @Override
+      public int compareTo(Region o)
+      {
+        return Long.compare(offset, o.offset);
+      }
+    }
+
     /**
      * Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
      * they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
@@ -1780,13 +1818,9 @@
      * +------------+--------------+--------------+----------------+
      * </pre>
      */
-    static final class FileRegionChunk implements Chunk
+    static final class FileRegion
     {
-      private final String metricName;
-      private final FileChannel channel;
-      private final long startOffset;
-      private long size;
-      private MappedByteBuffer mmapBuffer;
+      private final MappedByteBuffer mmapBuffer;
       private final OutputStream mmapBufferOS = new OutputStream()
       {
         @Override
@@ -1796,65 +1830,33 @@
         }
       };
 
-      FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException
+      FileRegion(FileChannel channel, long startOffset, long size) throws IOException
       {
-        this.metricName = name;
-        this.channel = channel;
-        this.startOffset = startOffset;
         if (size > 0)
         {
           // Make sure that the file is big-enough to encapsulate this memory-mapped region.
           channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1);
         }
-        this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
+        mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
       }
 
-      @Override
-      public boolean put(ByteSequence key, ByteSequence value)
+      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
       {
-        final int recordSize =
-            PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
-                .length();
-        if (mmapBuffer.remaining() < recordSize)
+        while (source.next())
         {
-          // The regions is full
-          return false;
-        }
-
-        try
-        {
+          final ByteSequence key = source.getKey();
+          final ByteSequence value = source.getValue();
           PackedLong.writeCompactUnsigned(mmapBufferOS, key.length());
           PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
+          key.copyTo(mmapBuffer);
+          value.copyTo(mmapBuffer);
         }
-        catch (IOException e)
-        {
-          throw new StorageRuntimeException(e);
-        }
-        key.copyTo(mmapBuffer);
-        value.copyTo(mmapBuffer);
-
-        return true;
-      }
-
-      @Override
-      public long size()
-      {
-        return mmapBuffer == null ? size : mmapBuffer.position();
-      }
-
-      @Override
-      public MeteredCursor<ByteString, ByteString> flip()
-      {
-        size = mmapBuffer.position();
-        mmapBuffer = null;
-        return new FileRegionChunkCursor(startOffset, size);
+        return mmapBuffer.position();
       }
 
       /** Cursor through the specific memory-mapped file's region. */
-      private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
+      static final class Cursor implements MeteredCursor<ByteString, ByteString>
       {
-        private final long regionOffset;
-        private final long regionSize;
         private final InputStream asInputStream = new InputStream()
         {
           @Override
@@ -1863,19 +1865,19 @@
             return region.get() & 0xFF;
           }
         };
+        private final String metricName;
         private ByteBuffer region;
         private ByteString key, value;
 
-        FileRegionChunkCursor(long regionOffset, long regionSize)
+        Cursor(String metricName, ByteBuffer region)
         {
-          this.regionOffset = regionOffset;
-          this.regionSize = regionSize;
+          this.metricName = metricName;
+          this.region = region;
         }
 
         @Override
         public boolean next()
         {
-          ensureRegionIsMemoryMapped();
           if (!region.hasRemaining())
           {
             key = value = null;
@@ -1904,22 +1906,6 @@
           return true;
         }
 
-        private void ensureRegionIsMemoryMapped()
-        {
-          if (region == null)
-          {
-            // Because mmap() regions are a counted and limited resources, we create them lazily.
-            try
-            {
-              region = channel.map(MapMode.READ_ONLY, regionOffset, regionSize);
-            }
-            catch (IOException e)
-            {
-              throw new IllegalStateException(e);
-            }
-          }
-        }
-
         @Override
         public boolean isDefined()
         {
@@ -1962,13 +1948,13 @@
         @Override
         public long getNbBytesRead()
         {
-          return region == null ? 0 : region.position();
+          return region.position();
         }
 
         @Override
         public long getNbBytesTotal()
         {
-          return regionSize;
+          return region.limit();
         }
       }
     }
@@ -2263,7 +2249,10 @@
     long nbRecords = 0;
     while (source.next())
     {
-      destination.put(source.getKey(), source.getValue());
+      if (!destination.put(source.getKey(), source.getValue()))
+      {
+        throw new IllegalStateException("Destination chunk is full");
+      }
       nbRecords++;
     }
     return nbRecords;
@@ -2815,6 +2804,7 @@
     }
 
     /** Off-heap buffer using Unsafe memory access. */
+    @SuppressWarnings("restriction")
     static final class OffHeapBuffer implements Buffer
     {
       private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
@@ -3140,6 +3130,7 @@
   {
     private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>();
 
+    @SuppressWarnings("unchecked")
     static <V> Collector<V, V> getInstance()
     {
       return (Collector<V, V>) INSTANCE;
@@ -3685,6 +3676,8 @@
       // +1 because newly added entry is added before the least recently one is removed.
       this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
       {
+        private static final long serialVersionUID = 1L;
+
         @Override
         protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
         {
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index 053c832..4b031cb 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -20,7 +20,9 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,7 +52,7 @@
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CompositeCursor;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegionChunk;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegion;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.MeteredCursor;
 import org.opends.server.backends.pluggable.OnDiskMergeImporter.UniqueValueCollector;
@@ -98,7 +100,6 @@
   }
 
   @Test
-  @SuppressWarnings(value = "resource")
   public void testCollectCursor()
   {
     final MeteredCursor<ByteString, ByteString> source =
@@ -118,7 +119,6 @@
   }
 
   @Test
-  @SuppressWarnings(value = "resource")
   public void testCompositeCursor()
   {
     final Collection<MeteredCursor<ByteString, ByteString>> sources = new ArrayList<>();
@@ -150,7 +150,7 @@
   }
 
   @Test
-  @SuppressWarnings(value = { "unchecked", "resource" })
+  @SuppressWarnings(value = "unchecked")
   public void testCounterCollector()
   {
     final MeteredCursor<String, ByteString> source = cursorOf(
@@ -172,7 +172,7 @@
   }
 
   @Test
-  @SuppressWarnings(value = { "unchecked", "resource" })
+  @SuppressWarnings(value = "unchecked")
   public void testEntryIDSetCollector()
   {
     final MeteredCursor<String, ByteString> source = cursorOf(
@@ -208,7 +208,6 @@
   }
 
   @Test
-  @SuppressWarnings(value = "resource")
   public void testUniqueValueCollectorAcceptUniqueValues()
   {
     final MeteredCursor<ByteString, ByteString> source =
@@ -224,7 +223,6 @@
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
-  @SuppressWarnings(value = "resource")
   public void testUniqueValueCollectorDoesNotAcceptMultipleValues()
   {
     final MeteredCursor<ByteString, ByteString> source =
@@ -259,8 +257,7 @@
   }
 
   @Test
-  @SuppressWarnings("resource")
-  public void testFileRegionChunk() throws Exception
+  public void testFileRegion() throws Exception
   {
     final int NB_REGION = 10;
     final int NB_RECORDS = 15;
@@ -285,25 +282,29 @@
     }
 
     // Copy content into file regions
-    final List<Chunk> regionChunks = new ArrayList<>(memoryChunks.size());
+    final List<Pair<Long, Integer>> regions = new ArrayList<>(memoryChunks.size());
     long offset = 0;
     for (Chunk source : memoryChunks)
     {
-      final Chunk region = new FileRegionChunk("test", channel, offset, source.size());
+      final FileRegion region = new FileRegion(channel, offset, source.size());
+      try(final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
+        regions.add(Pair.of(offset, region.write(cursor)));
+      }
       offset += source.size();
-      populate(region, toPairs(source.flip()));
-      regionChunks.add(region);
     }
 
     // Verify file regions contents
     int regionNumber = 0;
-    for (Chunk region : regionChunks)
+    final MappedByteBuffer buffer = channel.map(MapMode.READ_ONLY, 0, offset);
+    for (Pair<Long, Integer> region : regions)
     {
-      assertThat(toPairs(region.flip())).containsExactlyElementsOf(content(contents[regionNumber]));
+      buffer.position(region.getFirst().intValue()).limit(buffer.position() + region.getSecond());
+      assertThat(toPairs(new FileRegion.Cursor("test", buffer.slice()))).containsExactlyElementsOf(content(contents[regionNumber]));
       regionNumber++;
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testExternalSortChunk() throws Exception
   {

--
Gitblit v1.10.0