mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
29.59.2016 997c10c4c5946d76f99f1f2ab9d0786ead1e545c
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -182,58 +182,60 @@
    @Override
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
    {
      final int threadCount =
          importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
      final int indexCount = getIndexCount();
      final int nbRequiredBuffers = threadCount * indexCount * 2;
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
      final long startTime = System.currentTimeMillis();
      final OnDiskMergeImporter importer;
      final ExecutorService sorter =
          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
      final LDIFReaderSource source =
          new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
      final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
      try (final Importer dbStorage = rootContainer.getStorage().startImport();
           final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
      final int maxThreadCount = importConfig.getThreadCount() == 0
          ? getDefaultNumberOfThread()
          : importConfig.getThreadCount();
      final int nbBuffersPerThread = 2 * getIndexCount();
      try(final BufferPool bufferPool = newBufferPool(maxThreadCount, nbBuffersPerThread))
      {
        final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
        final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
            ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
            : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
        importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
        importer.doImport(source);
      }
      finally
      {
        sorter.shutdownNow();
        if (OperatingSystem.isWindows())
        final int threadCount = bufferPool.size() / nbBuffersPerThread;
        logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
        final OnDiskMergeImporter importer;
        final ExecutorService sorter =
            Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
        final LDIFReaderSource source =
            new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
        final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
        try (final Importer dbStorage = rootContainer.getStorage().startImport())
        {
          // Try to force the JVM to close mmap()ed file so that they can be deleted.
          // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
          System.gc();
          Runtime.getRuntime().runFinalization();
          final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
          final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
              ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
              : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
          importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
          importer.doImport(source);
        }
        recursiveDelete(tempDir);
      }
      logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
          / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
        finally
        {
          sorter.shutdownNow();
          if (OperatingSystem.isWindows())
          {
            // Try to force the JVM to close mmap()ed file so that they can be deleted.
            // (see http://bugs.java.com/view_bug.do?bug_id=4715154)
            System.gc();
            Runtime.getRuntime().runFinalization();
          }
          recursiveDelete(tempDir);
        }
        logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
            / 1000, importer.getPhaseTwoTimeInMillis() / 1000);
      final long importTime = System.currentTimeMillis() - startTime;
      float rate = 0;
      if (importTime > 0)
      {
        rate = 1000f * source.getEntriesRead() / importTime;
      }
      logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
          .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
        final long importTime = System.currentTimeMillis() - startTime;
        float rate = 0;
        if (importTime > 0)
        {
          rate = 1000f * source.getEntriesRead() / importTime;
        }
        logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
            .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
      return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
          .getEntriesIgnored());
        return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
            .getEntriesIgnored());
      }
    }
    private static int getDefaultNumberOfThread()
@@ -308,70 +310,118 @@
        return;
      }
      rootContainer.getStorage().close();
      final int threadCount = getDefaultNumberOfThread();
      final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
      final ExecutorService sorter =
          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
      final OnDiskMergeImporter importer;
      final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
      try (final Importer dbStorage = rootContainer.getStorage().startImport();
           final BufferPool bufferPool = newBufferPool(nbBuffer))
      final int nbBuffersPerThread = 2 * indexesToRebuild.size();
      try (final BufferPool bufferPool = newBufferPool(getDefaultNumberOfThread(), nbBuffersPerThread))
      {
        final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
            rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
        final int threadCount = bufferPool.size() / nbBuffersPerThread;
        final ExecutorService sorter =
            Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
        importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
        importer.doImport(
            new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries));
      }
      finally
      {
        sorter.shutdown();
        recursiveDelete(tempDir);
      }
        final OnDiskMergeImporter importer;
        final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
        try (final Importer dbStorage = rootContainer.getStorage().startImport())
        {
          final AbstractTwoPhaseImportStrategy strategy =
              new RebuildIndexStrategy(
                  rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
      final long totalTime = importer.getTotalTimeInMillis();
      final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
      logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
          importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
          importer.doImport(new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount,
              totalEntries));
        }
        finally
        {
          sorter.shutdown();
          recursiveDelete(tempDir);
        }
        final long totalTime = importer.getTotalTimeInMillis();
        final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
        logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
      }
    }
    public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
    /**
     * Try to allocate a {@link BufferPool} with a number of buffer in it being a multiple of {@code nbBuffers} in the
     * range [1, {@code maxThreadCount}] depending of the amount of memory available.
     */
    BufferPool newBufferPool(final int maxThreadCount, final int nbBuffers) throws InitializationException
    {
      final int initialThreadCount = maxThreadCount;
      final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
      final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
      if (offHeapMemoryAvailable > 0)
      boolean useOffHeap = (offheapMemorySize != null && offheapMemorySize > 0);
      long memoryAvailable =
          useOffHeap ? offheapMemorySize.longValue() : calculateAvailableHeapMemoryForBuffersAfterGC();
      int threadCount = initialThreadCount;
      for (;;)
      {
        // Try off-heap direct buffer
        logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
        int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
        while (bufferSize > MIN_BUFFER_SIZE)
        final int nbRequiredBuffers = threadCount * nbBuffers;
        try
        {
          try
          return useOffHeap
              ? newOffHeapBufferPool(nbRequiredBuffers, memoryAvailable)
              : newHeapBufferPool(nbRequiredBuffers, memoryAvailable);
        }
        catch (InitializationException e)
        {
          if (threadCount > 1)
          {
            final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
            final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
            logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
                DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
            return pool;
            // Retry using less buffer by reducing the number of importer threads.
            threadCount--;
          }
          catch (OutOfMemoryError e)
          else if (useOffHeap)
          {
            bufferSize /= 2;
            // Retry using on-heap buffer
            useOffHeap = false;
            memoryAvailable = calculateAvailableHeapMemoryForBuffersAfterGC();
            threadCount = initialThreadCount;
          }
          else
          {
            throw e;
          }
        }
      }
    }
      // Off-heap memory allocation has failed or is disabled.
      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
    private BufferPool newOffHeapBufferPool(final int nbBuffers, long offHeapMemoryAvailable)
        throws InitializationException
    {
      // Try off-heap direct buffer
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
      int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
      while (bufferSize > MIN_BUFFER_SIZE)
      {
        BufferPool pool = null;
        try
        {
          pool = new BufferPool(nbBuffers, bufferSize, true);
          final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers,
              bufferSize / KB);
          return pool;
        }
        catch (OutOfMemoryError e)
        {
          bufferSize /= 2;
          closeSilently(pool);
          pool = null;
        }
      }
      throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapMemoryAvailable, nbBuffers
          * MIN_BUFFER_SIZE));
    }
    private BufferPool newHeapBufferPool(final int nbBuffers, final long heapMemoryAvailable)
        throws InitializationException
    {
      final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      if (availableMemory < minimumRequiredMemory)
      if (heapMemoryAvailable < minimumRequiredMemory)
      {
        // Not enough memory.
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(heapMemoryAvailable, minimumRequiredMemory));
      }
      final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, heapMemoryAvailable, nbBuffers);
      final long buffersMemory = heapMemoryAvailable - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
      final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      return new BufferPool(nbBuffers, bufferSize, false);
@@ -520,41 +570,26 @@
     * Calculates the amount of available memory which can be used by this import, taking into account whether
     * the import is running offline or online as a task.
     */
    private long calculateAvailableHeapMemoryForBuffers()
    private long calculateAvailableHeapMemoryForBuffersAfterGC()
    {
      final long totalAvailableMemory = getAvailableMemoryAfterGC();
      int importMemPct = 100;
      if (totalAvailableMemory <= SMALL_HEAP_SIZE)
      {
        // Be pessimistic when memory is low.
        importMemPct -= 65;
      }
      return (totalAvailableMemory * importMemPct / 100);
    }
  }
      final Runtime runTime = Runtime.getRuntime();
      runTime.gc();
      runTime.gc();
  private static long getAvailableMemoryAfterGC()
  {
    final Runtime runTime = Runtime.getRuntime();
    runTime.gc();
    runTime.gc();
    // First try calculation based on oldgen size
    final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
    for (final MemoryPoolMXBean mpool : mpools)
    {
      final MemoryUsage usage = mpool.getUsage();
      if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
      // First try calculation based on oldgen size
      final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
      for (final MemoryPoolMXBean mpool : mpools)
      {
        final long max = usage.getMax();
        final long hardLimit = (long) (max * 0.90);
        final long softLimit = (long) (max * 0.69);
        final long used = usage.getUsed();
        return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
        final MemoryUsage usage = mpool.getUsage();
        if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
        {
          final long max = usage.getMax();
          return (max > SMALL_HEAP_SIZE ? (max * 90 / 100) : (max * 70 / 100));
        }
      }
      // Fall back to 40% of overall heap size (no need to do gc() again).
      return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
    }
    // Fall back to 40% of overall heap size (no need to do gc() again).
    return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
  }
  /** Source of LDAP {@link Entry}s to process. */
@@ -849,7 +884,7 @@
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 4 * MB;
  /** Min size of phase one buffer. */
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  private static final int MIN_BUFFER_SIZE = 32 * KB;
  /** DB cache size to use during import. */
  private static final int DB_CACHE_SIZE = 32 * MB;
  /** Required free memory for this importer. */
@@ -2771,12 +2806,25 @@
    BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
    {
      this.pool = new ArrayBlockingQueue<>(nbBuffer);
      for (int i = 0; i < nbBuffer; i++)
      try
      {
        pool.offer(new MemoryBuffer(allocateDirect
                        ? ByteBuffer.allocateDirect(bufferSize)
                        : ByteBuffer.allocate(bufferSize)));
        for (int i = 0; i < nbBuffer; i++)
        {
          pool.offer(new MemoryBuffer(allocateDirect
                          ? ByteBuffer.allocateDirect(bufferSize)
                          : ByteBuffer.allocate(bufferSize)));
        }
      }
      catch(OutOfMemoryError e)
      {
        close();
        throw e;
      }
    }
    private int size()
    {
      return pool.size();
    }
    private MemoryBuffer get()