mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
31.51.2016 571c3b4cb6142f4ef5eb41c3b8cf50dd9d6c9c81
OPENDJ-2631: OOME error while importing 100M entries.

Second phase needs all the cursor to be opened in order to perform the
merge. Since each cursor has its own memory mapped region, this can
create a huge number of those.
We're now memory mapping the biggest possible portion of file and share
it with the underlying cursors.
2 files modified
206 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java 175 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java 31 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -16,12 +16,11 @@
package org.opends.server.backends.pluggable;
import static java.nio.channels.FileChannel.*;
import static java.nio.file.StandardOpenOption.*;
import static org.forgerock.util.Utils.*;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
import static java.nio.file.StandardOpenOption.*;
import java.io.Closeable;
import java.io.File;
@@ -80,6 +79,7 @@
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.spi.Indexer;
import org.forgerock.util.Reject;
@@ -107,7 +107,6 @@
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
@@ -817,8 +816,6 @@
    }
  }
  /** Default size for the off-heap memory dedicated to the phase one's buffer. */
  private static final int DEFAULT_OFFHEAP_SIZE = 700;
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  /** Min size of phase one buffer. */
@@ -1401,7 +1398,7 @@
   * Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
   * unlimited amount of data. This class uses double-buffering: data are firstly stored in a
   * {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
   * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
   * {@link FileRegion}. Duplicate keys are reduced by a {@link Collector}.
   * {@link #put(ByteSequence, ByteSequence))} is thread-safe.
   * This class is used in phase-one. There is one {@link ExternalSortChunk} per
   * database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
@@ -1420,7 +1417,7 @@
    private final Collector<?, ByteString> phaseOneDeduplicator;
    private final Collector<?, ByteString> phaseTwoDeduplicator;
    /** Keep track of pending sorting tasks. */
    private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
    private final CompletionService<Region> sorter;
    /** Keep track of currently opened chunks. */
    private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>());
    /** Keep track of the number of chunks created. */
@@ -1473,12 +1470,38 @@
      {
        sortAndAppendChunkAsync(chunk);
      }
      final List<MeteredCursor<ByteString, ByteString>> cursors = new ArrayList<>();
      try
      {
        return new CollectorCursor<>(
            new CompositeCursor<ByteString, ByteString>(name,
                waitTasksTermination(sorter, nbSortedChunks.get()))
        final List<Region> regions = waitTasksTermination(sorter, nbSortedChunks.get());
        Collections.sort(regions); // Sort regions by their starting offsets.
        long mmapPosition = 0;
        // Create as big as possible memory are (handling 2Gb limit) and create as many cursors as regions from
        // these area.
        MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get(), Integer.MAX_VALUE));
        for (Region region : regions)
            {
          if ((region.offset + region.size) > (mmapPosition + Integer.MAX_VALUE))
          {
            // Handle the 2Gb ByteBuffer limit
            mmapPosition = region.offset;
            mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get() - mmapPosition, Integer.MAX_VALUE));
          }
          final ByteBuffer regionBuffer = mmap.duplicate();
          final int relativeRegionOffset = (int) (region.offset - mmapPosition);
          regionBuffer.position(relativeRegionOffset).limit(regionBuffer.position() + region.size);
          cursors.add(new FileRegion.Cursor(name, regionBuffer.slice()));
        }
      }
      catch (ExecutionException | InterruptedException | IOException e)
      {
        throw new StorageRuntimeException(e);
      }
      return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
      {
        @Override
              public void close()
              {
                super.close();
@@ -1499,11 +1522,6 @@
              }
            }, (Collector<?, ByteString>) phaseTwoDeduplicator);
      }
      catch (ExecutionException | InterruptedException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    @Override
    public long size()
@@ -1528,26 +1546,46 @@
      final long startOffset = filePosition.getAndAdd(chunk.size());
      nbSortedChunks.incrementAndGet();
      sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>()
      sorter.submit(new Callable<Region>()
      {
        @Override
        public MeteredCursor<ByteString, ByteString> call() throws Exception
        public Region call() throws Exception
        {
          /*
           * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
           * de-duplication performed by the CollectorCursor.
           * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
           * performed by the CollectorCursor.
           */
          final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
          final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
          final int regionSize;
          try (final SequentialCursor<ByteString, ByteString> source =
              new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
          {
            copyIntoChunk(source, persistentChunk);
            regionSize = region.write(source);
          }
          return persistentChunk.flip();
          return new Region(startOffset, regionSize);
        }
      });
    }
    /** Define a region inside a file. */
    private static final class Region implements Comparable<Region>
    {
      private final long offset;
      private final int size;
      Region(long offset, int size)
      {
        this.offset = offset;
        this.size = size;
      }
      @Override
      public int compareTo(Region o)
      {
        return Long.compare(offset, o.offset);
      }
    }
    /**
     * Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
     * they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
@@ -1780,13 +1818,9 @@
     * +------------+--------------+--------------+----------------+
     * </pre>
     */
    static final class FileRegionChunk implements Chunk
    static final class FileRegion
    {
      private final String metricName;
      private final FileChannel channel;
      private final long startOffset;
      private long size;
      private MappedByteBuffer mmapBuffer;
      private final MappedByteBuffer mmapBuffer;
      private final OutputStream mmapBufferOS = new OutputStream()
      {
        @Override
@@ -1796,65 +1830,33 @@
        }
      };
      FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException
      FileRegion(FileChannel channel, long startOffset, long size) throws IOException
      {
        this.metricName = name;
        this.channel = channel;
        this.startOffset = startOffset;
        if (size > 0)
        {
          // Make sure that the file is big-enough to encapsulate this memory-mapped region.
          channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1);
        }
        this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
        mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
      }
      @Override
      public boolean put(ByteSequence key, ByteSequence value)
      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
      {
        final int recordSize =
            PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
                .length();
        if (mmapBuffer.remaining() < recordSize)
        while (source.next())
        {
          // The regions is full
          return false;
        }
        try
        {
          final ByteSequence key = source.getKey();
          final ByteSequence value = source.getValue();
          PackedLong.writeCompactUnsigned(mmapBufferOS, key.length());
          PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
        }
        catch (IOException e)
        {
          throw new StorageRuntimeException(e);
        }
        key.copyTo(mmapBuffer);
        value.copyTo(mmapBuffer);
        return true;
      }
      @Override
      public long size()
      {
        return mmapBuffer == null ? size : mmapBuffer.position();
      }
      @Override
      public MeteredCursor<ByteString, ByteString> flip()
      {
        size = mmapBuffer.position();
        mmapBuffer = null;
        return new FileRegionChunkCursor(startOffset, size);
        return mmapBuffer.position();
      }
      /** Cursor through the specific memory-mapped file's region. */
      private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
      static final class Cursor implements MeteredCursor<ByteString, ByteString>
      {
        private final long regionOffset;
        private final long regionSize;
        private final InputStream asInputStream = new InputStream()
        {
          @Override
@@ -1863,19 +1865,19 @@
            return region.get() & 0xFF;
          }
        };
        private final String metricName;
        private ByteBuffer region;
        private ByteString key, value;
        FileRegionChunkCursor(long regionOffset, long regionSize)
        Cursor(String metricName, ByteBuffer region)
        {
          this.regionOffset = regionOffset;
          this.regionSize = regionSize;
          this.metricName = metricName;
          this.region = region;
        }
        @Override
        public boolean next()
        {
          ensureRegionIsMemoryMapped();
          if (!region.hasRemaining())
          {
            key = value = null;
@@ -1904,22 +1906,6 @@
          return true;
        }
        private void ensureRegionIsMemoryMapped()
        {
          if (region == null)
          {
            // Because mmap() regions are a counted and limited resources, we create them lazily.
            try
            {
              region = channel.map(MapMode.READ_ONLY, regionOffset, regionSize);
            }
            catch (IOException e)
            {
              throw new IllegalStateException(e);
            }
          }
        }
        @Override
        public boolean isDefined()
        {
@@ -1962,13 +1948,13 @@
        @Override
        public long getNbBytesRead()
        {
          return region == null ? 0 : region.position();
          return region.position();
        }
        @Override
        public long getNbBytesTotal()
        {
          return regionSize;
          return region.limit();
        }
      }
    }
@@ -2263,7 +2249,10 @@
    long nbRecords = 0;
    while (source.next())
    {
      destination.put(source.getKey(), source.getValue());
      if (!destination.put(source.getKey(), source.getValue()))
      {
        throw new IllegalStateException("Destination chunk is full");
      }
      nbRecords++;
    }
    return nbRecords;
@@ -2815,6 +2804,7 @@
    }
    /** Off-heap buffer using Unsafe memory access. */
    @SuppressWarnings("restriction")
    static final class OffHeapBuffer implements Buffer
    {
      private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
@@ -3140,6 +3130,7 @@
  {
    private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>();
    @SuppressWarnings("unchecked")
    static <V> Collector<V, V> getInstance()
    {
      return (Collector<V, V>) INSTANCE;
@@ -3685,6 +3676,8 @@
      // +1 because newly added entry is added before the least recently one is removed.
      this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
      {
        private static final long serialVersionUID = 1L;
        @Override
        protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
        {
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -20,7 +20,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,7 +52,7 @@
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CompositeCursor;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegionChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegion;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.MeteredCursor;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.UniqueValueCollector;
@@ -98,7 +100,6 @@
  }
  @Test
  @SuppressWarnings(value = "resource")
  public void testCollectCursor()
  {
    final MeteredCursor<ByteString, ByteString> source =
@@ -118,7 +119,6 @@
  }
  @Test
  @SuppressWarnings(value = "resource")
  public void testCompositeCursor()
  {
    final Collection<MeteredCursor<ByteString, ByteString>> sources = new ArrayList<>();
@@ -150,7 +150,7 @@
  }
  @Test
  @SuppressWarnings(value = { "unchecked", "resource" })
  @SuppressWarnings(value = "unchecked")
  public void testCounterCollector()
  {
    final MeteredCursor<String, ByteString> source = cursorOf(
@@ -172,7 +172,7 @@
  }
  @Test
  @SuppressWarnings(value = { "unchecked", "resource" })
  @SuppressWarnings(value = "unchecked")
  public void testEntryIDSetCollector()
  {
    final MeteredCursor<String, ByteString> source = cursorOf(
@@ -208,7 +208,6 @@
  }
  @Test
  @SuppressWarnings(value = "resource")
  public void testUniqueValueCollectorAcceptUniqueValues()
  {
    final MeteredCursor<ByteString, ByteString> source =
@@ -224,7 +223,6 @@
  }
  @Test(expectedExceptions = IllegalArgumentException.class)
  @SuppressWarnings(value = "resource")
  public void testUniqueValueCollectorDoesNotAcceptMultipleValues()
  {
    final MeteredCursor<ByteString, ByteString> source =
@@ -259,8 +257,7 @@
  }
  @Test
  @SuppressWarnings("resource")
  public void testFileRegionChunk() throws Exception
  public void testFileRegion() throws Exception
  {
    final int NB_REGION = 10;
    final int NB_RECORDS = 15;
@@ -285,25 +282,29 @@
    }
    // Copy content into file regions
    final List<Chunk> regionChunks = new ArrayList<>(memoryChunks.size());
    final List<Pair<Long, Integer>> regions = new ArrayList<>(memoryChunks.size());
    long offset = 0;
    for (Chunk source : memoryChunks)
    {
      final Chunk region = new FileRegionChunk("test", channel, offset, source.size());
      final FileRegion region = new FileRegion(channel, offset, source.size());
      try(final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
        regions.add(Pair.of(offset, region.write(cursor)));
      }
      offset += source.size();
      populate(region, toPairs(source.flip()));
      regionChunks.add(region);
    }
    // Verify file regions contents
    int regionNumber = 0;
    for (Chunk region : regionChunks)
    final MappedByteBuffer buffer = channel.map(MapMode.READ_ONLY, 0, offset);
    for (Pair<Long, Integer> region : regions)
    {
      assertThat(toPairs(region.flip())).containsExactlyElementsOf(content(contents[regionNumber]));
      buffer.position(region.getFirst().intValue()).limit(buffer.position() + region.getSecond());
      assertThat(toPairs(new FileRegion.Cursor("test", buffer.slice()))).containsExactlyElementsOf(content(contents[regionNumber]));
      regionNumber++;
    }
  }
  @SuppressWarnings("unchecked")
  @Test
  public void testExternalSortChunk() throws Exception
  {