mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
14.07.2016 acac2a13ebe79697f86272cacd61541955c9d343
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -27,7 +27,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -94,7 +93,7 @@
import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter;
import org.opends.server.backends.pluggable.DN2ID.TreeVisitor;
import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -117,8 +116,7 @@
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
// @Checkstyle:ignore
import sun.misc.Unsafe;
import net.jcip.annotations.NotThreadSafe;
/**
 * Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is
@@ -170,41 +168,22 @@
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
    {
      final int threadCount =
          importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors()
              : importConfig.getThreadCount();
          importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
      final int indexCount = getIndexCount();
      final int nbBuffer = threadCount * indexCount * 2;
      final int bufferSize;
      if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
      {
        final long offHeapSize = importConfig.getOffHeapSize();
        bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
        if (bufferSize < MIN_BUFFER_SIZE)
        {
          // Not enough memory.
          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
        }
        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
      }
      else
      {
        bufferSize = computeBufferSize(nbBuffer);
        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      }
      final int nbRequiredBuffers = threadCount * indexCount * 2;
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
      final long startTime = System.currentTimeMillis();
      final OnDiskMergeImporter importer;
      final ExecutorService sorter = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors(),
          newThreadFactory(null, SORTER_THREAD_NAME, true));
      final ExecutorService sorter =
          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
      final LDIFReaderSource source =
          new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
      final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
      try (final Importer dbStorage = rootContainer.getStorage().startImport();
           final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
           final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
      {
        final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
        final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
@@ -241,6 +220,12 @@
          .getEntriesIgnored());
    }
    private static int getDefaultNumberOfThread()
    {
      final int nbProcessors = Runtime.getRuntime().availableProcessors();
      return Math.max(2, DirectoryServer.isRunning() ? nbProcessors / 2 : nbProcessors);
    }
    private int getIndexCount() throws ConfigException
    {
      int indexCount = 2; // dn2id, dn2uri
@@ -307,29 +292,15 @@
        return;
      }
      rootContainer.getStorage().close();
      final int threadCount = Runtime.getRuntime().availableProcessors();
      final int threadCount = getDefaultNumberOfThread();
      final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
      final int bufferSize;
      if (BufferPool.SUPPORTS_OFF_HEAP)
      {
        bufferSize = MAX_BUFFER_SIZE;
        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
            DB_CACHE_SIZE, (((long) bufferSize) * nbBuffer) / MB, nbBuffer, bufferSize / KB);
      }
      else
      {
        bufferSize = computeBufferSize(nbBuffer);
        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      }
      final ExecutorService sorter = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors(),
          newThreadFactory(null, SORTER_THREAD_NAME, true));
      final ExecutorService sorter =
          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
      final OnDiskMergeImporter importer;
      final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
      try (final Importer dbStorage = rootContainer.getStorage().startImport();
           final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
           final BufferPool bufferPool = newBufferPool(nbBuffer))
      {
        final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
            rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
@@ -349,6 +320,44 @@
      logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
    }
    public BufferPool newBufferPool(int nbBuffers) throws InitializationException
    {
      // Try off-heap direct buffer
      int bufferSize = MAX_BUFFER_SIZE;
      do
      {
        try
        {
          final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
              DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
          return pool;
        }
        catch (OutOfMemoryError e)
        {
          bufferSize /= 2;
        }
      }
      while (bufferSize > MIN_BUFFER_SIZE);
      // Off-line mode or direct memory allocation failed.
      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
      long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      if (availableMemory < minimumRequiredMemory)
      {
        // Not enough memory.
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
      }
      bufferSize = (int) (availableMemory / nbBuffers);
      if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
      {
        bufferSize = MAX_BUFFER_SIZE;
      }
      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      return new BufferPool(nbBuffers, bufferSize, false);
    }
    private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig,
        long totalEntries) throws InitializationException
    {
@@ -489,20 +498,6 @@
      return tempDir;
    }
    private int computeBufferSize(int nbBuffer) throws InitializationException
    {
      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
      final long minimumRequiredMemory = nbBuffer * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      if (availableMemory < minimumRequiredMemory)
      {
        // Not enough memory.
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
      }
      return Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE);
    }
    /**
     * Calculates the amount of available memory which can be used by this import, taking into account whether
     * the import is running offline or online as a task.
@@ -519,6 +514,11 @@
      else
      {
        // Offline import/rebuild.
        // call twice gc to ensure finalizers are called
        // and young to old gen references are properly gc'd
        Runtime.getRuntime().gc();
        Runtime.getRuntime().gc();
        totalAvailableMemory = Platform.getUsableMemoryForCaching();
      }
@@ -823,7 +823,7 @@
  }
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  private static final int MAX_BUFFER_SIZE = 4 * MB;
  /** Min size of phase one buffer. */
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  /** DB cache size to use during import. */
@@ -1621,7 +1621,7 @@
    {
      private final String metricName;
      private final BufferPool bufferPool;
      private final Buffer buffer;
      private final MemoryBuffer buffer;
      private long totalBytes;
      private int indexPos;
      private int dataPos;
@@ -2727,67 +2727,26 @@
    }
  }
  /** Buffer used by {@link InMemorySortedChunk} to store and sort data. */
  interface Buffer extends Closeable
  {
    void writeInt(int position, int value);
    int readInt(int position);
    ByteString readByteString(int position, int length);
    void writeByteSequence(int position, ByteSequence data);
    int length();
    int compare(int offsetA, int lengthA, int offsetB, int lengthB);
  }
  /**
   * Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory
   * consumption and prevents the significant object allocation cost occurring for huge objects.
   */
  static final class BufferPool implements Closeable
  {
    private static final Object UNSAFE_OBJECT;
    static final boolean SUPPORTS_OFF_HEAP;
    static
    {
      Object unsafeObject = null;
      try
      {
        final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
        final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
        theUnsafeField.setAccessible(true);
        unsafeObject = theUnsafeField.get(null);
      }
      catch (Throwable e)
      {
        // Unsupported.
      }
      UNSAFE_OBJECT = unsafeObject;
      SUPPORTS_OFF_HEAP = UNSAFE_OBJECT != null;
    }
    private final BlockingQueue<MemoryBuffer> pool;
    private final BlockingQueue<Buffer> pool;
    private final int bufferSize;
    BufferPool(int nbBuffer, int bufferSize)
    BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
    {
      this.pool = new ArrayBlockingQueue<>(nbBuffer);
      this.bufferSize = bufferSize;
      for (int i = 0; i < nbBuffer; i++)
      {
        pool.offer(SUPPORTS_OFF_HEAP ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize));
        pool.offer(new MemoryBuffer(allocateDirect
                        ? ByteBuffer.allocateDirect(bufferSize)
                        : ByteBuffer.allocate(bufferSize)));
      }
    }
    public int getBufferSize()
    {
      return bufferSize;
    }
    private Buffer get()
    private MemoryBuffer get()
    {
      try
      {
@@ -2799,7 +2758,7 @@
      }
    }
    private void release(Buffer buffer)
    private void release(MemoryBuffer buffer)
    {
      try
      {
@@ -2811,183 +2770,71 @@
      }
    }
    public void setSize(int size)
    {
      while (pool.size() > size)
      {
        get();
      }
    }
    @Override
    public void close()
    {
      Buffer buffer;
      while ((buffer = pool.poll()) != null)
      {
        closeSilently(buffer);
      }
      pool.clear();
    }
    /** Off-heap buffer using Unsafe memory access. */
    @SuppressWarnings("restriction")
    static final class OffHeapBuffer implements Buffer
    {
      private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
      private static final long BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
      private final long address;
      private final int size;
      private int position;
      private final OutputStream asOutputStream = new OutputStream()
      {
        @Override
        public void write(int value) throws IOException
        {
          UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
        }
        @Override
        public void write(byte[] b) throws IOException {
            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
            position += b.length;
        }
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
            position += b.length;
        }
      };
      private boolean closed;
      OffHeapBuffer(int size)
      {
        this.size = size;
        this.address = UNSAFE.allocateMemory(size);
      }
      @Override
      public void writeInt(final int position, final int value)
      {
        UNSAFE.putInt(address + position, value);
      }
      @Override
      public int readInt(final int position)
      {
        return UNSAFE.getInt(address + position);
      }
      @Override
      public void writeByteSequence(final int position, ByteSequence data)
      {
        Reject.ifFalse(position + data.length() <= size);
        this.position = position;
        try
        {
          data.copyTo(asOutputStream);
        }
        catch(IOException e)
        {
          throw new StorageRuntimeException(e);
        }
      }
      @Override
      public int length()
      {
        return size;
      }
      @Override
      public ByteString readByteString(int position, int length)
      {
        Reject.ifFalse(position + length <= size);
        final byte[] data = new byte[length];
        UNSAFE.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length);
        return ByteString.wrap(data);
      }
      @Override
      public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
      {
        final int len = Math.min(lengthA, lengthB);
        for(int i = 0 ; i < len ; i++)
        {
          final int a = UNSAFE.getByte(address + offsetA + i) & 0xFF;
          final int b = UNSAFE.getByte(address + offsetB + i) & 0xFF;
          if ( a != b )
          {
            return a - b;
          }
        }
        return lengthA - lengthB;
      }
      @Override
      public void close()
      {
        if (!closed)
        {
          UNSAFE.freeMemory(address);
        }
        closed = true;
      }
    }
    /** Heap buffer using ByteBuffer. */
    static final class HeapBuffer implements Buffer
    /** Buffer wrapping a ByteBuffer. */
    @NotThreadSafe
    static final class MemoryBuffer
    {
      private final ByteBuffer buffer;
      HeapBuffer(int size)
      MemoryBuffer(final ByteBuffer byteBuffer)
      {
        this.buffer = ByteBuffer.allocate(size);
        this.buffer = byteBuffer;
      }
      @Override
      public void writeInt(final int position, final int value)
      void writeInt(final int position, final int value)
      {
        buffer.putInt(position, value);
      }
      @Override
      public int readInt(final int position)
      int readInt(final int position)
      {
        return buffer.getInt(position);
      }
      @Override
      public void writeByteSequence(int position, ByteSequence data)
      void writeByteSequence(int position, ByteSequence data)
      {
        buffer.position(position);
        data.copyTo(buffer);
      }
      @Override
      public int length()
      int length()
      {
        return buffer.capacity();
      }
      @Override
      public ByteString readByteString(int position, int length)
      ByteString readByteString(int position, int length)
      {
        return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
        if (buffer.hasArray())
        {
          return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
        }
        final byte[] data = new byte[length];
        buffer.position(position);
        buffer.get(data);
        return ByteString.wrap(data);
      }
      @Override
      public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
      int compare(int offsetA, int lengthA, int offsetB, int lengthB)
      {
        return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB));
      }
      @Override
      public void close()
      {
        // Nothing to do
        int count = Math.min(lengthA, lengthB);
        int i = offsetA;
        int j = offsetB;
        while (count-- != 0)
        {
            final int firstByte = 0xFF & buffer.get(i++);
            final int secondByte = 0xFF & buffer.get(j++);
            if (firstByte != secondByte)
            {
                return firstByte - secondByte;
            }
        }
        return lengthA - lengthB;
      }
    }
  }