mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
31.51.2016 571c3b4cb6142f4ef5eb41c3b8cf50dd9d6c9c81
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -16,12 +16,11 @@
package org.opends.server.backends.pluggable;
import static java.nio.channels.FileChannel.*;
import static java.nio.file.StandardOpenOption.*;
import static org.forgerock.util.Utils.*;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
import static java.nio.file.StandardOpenOption.*;
import java.io.Closeable;
import java.io.File;
@@ -80,6 +79,7 @@
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.spi.Indexer;
import org.forgerock.util.Reject;
@@ -107,7 +107,6 @@
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
@@ -817,8 +816,6 @@
    }
  }
  /** Default size for the off-heap memory dedicated to the phase one's buffer. */
  private static final int DEFAULT_OFFHEAP_SIZE = 700;
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  /** Min size of phase one buffer. */
@@ -1401,7 +1398,7 @@
   * Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
   * unlimited amount of data. This class uses double-buffering: data are firstly stored in a
   * {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
   * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
   * {@link FileRegion}. Duplicate keys are reduced by a {@link Collector}.
   * {@link #put(ByteSequence, ByteSequence))} is thread-safe.
   * This class is used in phase-one. There is one {@link ExternalSortChunk} per
   * database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
@@ -1420,7 +1417,7 @@
    private final Collector<?, ByteString> phaseOneDeduplicator;
    private final Collector<?, ByteString> phaseTwoDeduplicator;
    /** Keep track of pending sorting tasks. */
    private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
    private final CompletionService<Region> sorter;
    /** Keep track of currently opened chunks. */
    private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>());
    /** Keep track of the number of chunks created. */
@@ -1473,36 +1470,57 @@
      {
        sortAndAppendChunkAsync(chunk);
      }
      final List<MeteredCursor<ByteString, ByteString>> cursors = new ArrayList<>();
      try
      {
        return new CollectorCursor<>(
            new CompositeCursor<ByteString, ByteString>(name,
                waitTasksTermination(sorter, nbSortedChunks.get()))
            {
              public void close()
              {
                super.close();
                if (OperatingSystem.isWindows())
                {
                  // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
                  // To prevent these not deleted files to waste space, we empty it.
                  try
                  {
                    channel.truncate(0);
                  }
                  catch (IOException e)
                  {
                    // This is best effort, it's safe to ignore the exception here.
                  }
                }
                closeSilently(channel);
              }
            }, (Collector<?, ByteString>) phaseTwoDeduplicator);
        final List<Region> regions = waitTasksTermination(sorter, nbSortedChunks.get());
        Collections.sort(regions); // Sort regions by their starting offsets.
        long mmapPosition = 0;
        // Create as big as possible memory are (handling 2Gb limit) and create as many cursors as regions from
        // these area.
        MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get(), Integer.MAX_VALUE));
        for (Region region : regions)
        {
          if ((region.offset + region.size) > (mmapPosition + Integer.MAX_VALUE))
          {
            // Handle the 2Gb ByteBuffer limit
            mmapPosition = region.offset;
            mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get() - mmapPosition, Integer.MAX_VALUE));
          }
          final ByteBuffer regionBuffer = mmap.duplicate();
          final int relativeRegionOffset = (int) (region.offset - mmapPosition);
          regionBuffer.position(relativeRegionOffset).limit(regionBuffer.position() + region.size);
          cursors.add(new FileRegion.Cursor(name, regionBuffer.slice()));
        }
      }
      catch (ExecutionException | InterruptedException e)
      catch (ExecutionException | InterruptedException | IOException e)
      {
        throw new StorageRuntimeException(e);
      }
      return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
      {
        @Override
        public void close()
        {
          super.close();
          if (OperatingSystem.isWindows())
          {
            // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
            // To prevent these not deleted files to waste space, we empty it.
            try
            {
              channel.truncate(0);
            }
            catch (IOException e)
            {
              // This is best effort, it's safe to ignore the exception here.
            }
          }
          closeSilently(channel);
        }
      }, (Collector<?, ByteString>) phaseTwoDeduplicator);
    }
    @Override
@@ -1528,26 +1546,46 @@
      final long startOffset = filePosition.getAndAdd(chunk.size());
      nbSortedChunks.incrementAndGet();
      sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>()
      sorter.submit(new Callable<Region>()
      {
        @Override
        public MeteredCursor<ByteString, ByteString> call() throws Exception
        public Region call() throws Exception
        {
          /*
           * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
           * de-duplication performed by the CollectorCursor.
           * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
           * performed by the CollectorCursor.
           */
          final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
          final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
          final int regionSize;
          try (final SequentialCursor<ByteString, ByteString> source =
              new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
          {
            copyIntoChunk(source, persistentChunk);
            regionSize = region.write(source);
          }
          return persistentChunk.flip();
          return new Region(startOffset, regionSize);
        }
      });
    }
    /** Define a region inside a file. */
    private static final class Region implements Comparable<Region>
    {
      private final long offset;
      private final int size;
      Region(long offset, int size)
      {
        this.offset = offset;
        this.size = size;
      }
      @Override
      public int compareTo(Region o)
      {
        return Long.compare(offset, o.offset);
      }
    }
    /**
     * Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
     * they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
@@ -1780,13 +1818,9 @@
     * +------------+--------------+--------------+----------------+
     * </pre>
     */
    static final class FileRegionChunk implements Chunk
    static final class FileRegion
    {
      private final String metricName;
      private final FileChannel channel;
      private final long startOffset;
      private long size;
      private MappedByteBuffer mmapBuffer;
      private final MappedByteBuffer mmapBuffer;
      private final OutputStream mmapBufferOS = new OutputStream()
      {
        @Override
@@ -1796,65 +1830,33 @@
        }
      };
      FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException
      FileRegion(FileChannel channel, long startOffset, long size) throws IOException
      {
        this.metricName = name;
        this.channel = channel;
        this.startOffset = startOffset;
        if (size > 0)
        {
          // Make sure that the file is big-enough to encapsulate this memory-mapped region.
          channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1);
        }
        this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
        mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
      }
      @Override
      public boolean put(ByteSequence key, ByteSequence value)
      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
      {
        final int recordSize =
            PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
                .length();
        if (mmapBuffer.remaining() < recordSize)
        while (source.next())
        {
          // The regions is full
          return false;
        }
        try
        {
          final ByteSequence key = source.getKey();
          final ByteSequence value = source.getValue();
          PackedLong.writeCompactUnsigned(mmapBufferOS, key.length());
          PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
          key.copyTo(mmapBuffer);
          value.copyTo(mmapBuffer);
        }
        catch (IOException e)
        {
          throw new StorageRuntimeException(e);
        }
        key.copyTo(mmapBuffer);
        value.copyTo(mmapBuffer);
        return true;
      }
      @Override
      public long size()
      {
        return mmapBuffer == null ? size : mmapBuffer.position();
      }
      @Override
      public MeteredCursor<ByteString, ByteString> flip()
      {
        size = mmapBuffer.position();
        mmapBuffer = null;
        return new FileRegionChunkCursor(startOffset, size);
        return mmapBuffer.position();
      }
      /** Cursor through the specific memory-mapped file's region. */
      private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
      static final class Cursor implements MeteredCursor<ByteString, ByteString>
      {
        private final long regionOffset;
        private final long regionSize;
        private final InputStream asInputStream = new InputStream()
        {
          @Override
@@ -1863,19 +1865,19 @@
            return region.get() & 0xFF;
          }
        };
        private final String metricName;
        private ByteBuffer region;
        private ByteString key, value;
        FileRegionChunkCursor(long regionOffset, long regionSize)
        Cursor(String metricName, ByteBuffer region)
        {
          this.regionOffset = regionOffset;
          this.regionSize = regionSize;
          this.metricName = metricName;
          this.region = region;
        }
        @Override
        public boolean next()
        {
          ensureRegionIsMemoryMapped();
          if (!region.hasRemaining())
          {
            key = value = null;
@@ -1904,22 +1906,6 @@
          return true;
        }
        private void ensureRegionIsMemoryMapped()
        {
          if (region == null)
          {
            // Because mmap() regions are a counted and limited resources, we create them lazily.
            try
            {
              region = channel.map(MapMode.READ_ONLY, regionOffset, regionSize);
            }
            catch (IOException e)
            {
              throw new IllegalStateException(e);
            }
          }
        }
        @Override
        public boolean isDefined()
        {
@@ -1962,13 +1948,13 @@
        @Override
        public long getNbBytesRead()
        {
          return region == null ? 0 : region.position();
          return region.position();
        }
        @Override
        public long getNbBytesTotal()
        {
          return regionSize;
          return region.limit();
        }
      }
    }
@@ -2263,7 +2249,10 @@
    long nbRecords = 0;
    while (source.next())
    {
      destination.put(source.getKey(), source.getValue());
      if (!destination.put(source.getKey(), source.getValue()))
      {
        throw new IllegalStateException("Destination chunk is full");
      }
      nbRecords++;
    }
    return nbRecords;
@@ -2815,6 +2804,7 @@
    }
    /** Off-heap buffer using Unsafe memory access. */
    @SuppressWarnings("restriction")
    static final class OffHeapBuffer implements Buffer
    {
      private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
@@ -3140,6 +3130,7 @@
  {
    private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>();
    @SuppressWarnings("unchecked")
    static <V> Collector<V, V> getInstance()
    {
      return (Collector<V, V>) INSTANCE;
@@ -3685,6 +3676,8 @@
      // +1 because newly added entry is added before the least recently one is removed.
      this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
      {
        private static final long serialVersionUID = 1L;
        @Override
        protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
        {
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -20,7 +20,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,7 +52,7 @@
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CompositeCursor;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegionChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegion;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.MeteredCursor;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.UniqueValueCollector;
@@ -98,7 +100,6 @@
  }
  @Test
  @SuppressWarnings(value = "resource")
  public void testCollectCursor()
  {
    final MeteredCursor<ByteString, ByteString> source =
@@ -118,7 +119,6 @@
  }
  @Test
  @SuppressWarnings(value = "resource")
  public void testCompositeCursor()
  {
    final Collection<MeteredCursor<ByteString, ByteString>> sources = new ArrayList<>();
@@ -150,7 +150,7 @@
  }
  @Test
  @SuppressWarnings(value = { "unchecked", "resource" })
  @SuppressWarnings(value = "unchecked")
  public void testCounterCollector()
  {
    final MeteredCursor<String, ByteString> source = cursorOf(
@@ -172,7 +172,7 @@
  }
  @Test
  @SuppressWarnings(value = { "unchecked", "resource" })
  @SuppressWarnings(value = "unchecked")
  public void testEntryIDSetCollector()
  {
    final MeteredCursor<String, ByteString> source = cursorOf(
@@ -208,7 +208,6 @@
  }
  @Test
  @SuppressWarnings(value = "resource")
  public void testUniqueValueCollectorAcceptUniqueValues()
  {
    final MeteredCursor<ByteString, ByteString> source =
@@ -224,7 +223,6 @@
  }
  @Test(expectedExceptions = IllegalArgumentException.class)
  @SuppressWarnings(value = "resource")
  public void testUniqueValueCollectorDoesNotAcceptMultipleValues()
  {
    final MeteredCursor<ByteString, ByteString> source =
@@ -259,8 +257,7 @@
  }
  @Test
  @SuppressWarnings("resource")
  public void testFileRegionChunk() throws Exception
  public void testFileRegion() throws Exception
  {
    final int NB_REGION = 10;
    final int NB_RECORDS = 15;
@@ -285,25 +282,29 @@
    }
    // Copy content into file regions
    final List<Chunk> regionChunks = new ArrayList<>(memoryChunks.size());
    final List<Pair<Long, Integer>> regions = new ArrayList<>(memoryChunks.size());
    long offset = 0;
    for (Chunk source : memoryChunks)
    {
      final Chunk region = new FileRegionChunk("test", channel, offset, source.size());
      final FileRegion region = new FileRegion(channel, offset, source.size());
      try(final SequentialCursor<ByteString, ByteString> cursor = source.flip()) {
        regions.add(Pair.of(offset, region.write(cursor)));
      }
      offset += source.size();
      populate(region, toPairs(source.flip()));
      regionChunks.add(region);
    }
    // Verify file regions contents
    int regionNumber = 0;
    for (Chunk region : regionChunks)
    final MappedByteBuffer buffer = channel.map(MapMode.READ_ONLY, 0, offset);
    for (Pair<Long, Integer> region : regions)
    {
      assertThat(toPairs(region.flip())).containsExactlyElementsOf(content(contents[regionNumber]));
      buffer.position(region.getFirst().intValue()).limit(buffer.position() + region.getSecond());
      assertThat(toPairs(new FileRegion.Cursor("test", buffer.slice()))).containsExactlyElementsOf(content(contents[regionNumber]));
      regionNumber++;
    }
  }
  @SuppressWarnings("unchecked")
  @Test
  public void testExternalSortChunk() throws Exception
  {