mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
09.13.2016 26455ba31b4830d8b91f672a380e8e2c1cb60e7c
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -123,6 +123,7 @@
import org.opends.server.types.LDIFImportResult;
import org.opends.server.util.Platform;
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
// @Checkstyle:ignore
@@ -177,16 +178,13 @@
    @Override
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
    {
      final long availableMemory = calculateAvailableMemory();
      final int threadCount =
          importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors()
              : importConfig.getThreadCount();
      final int indexCount = getIndexCount();
      final int nbBuffer = threadCount * indexCount * 2;
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
      final int bufferSize = computeBufferSize(nbBuffer, availableMemory);
      final int bufferSize = computeBufferSize(nbBuffer);
      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
@@ -213,6 +211,11 @@
      finally
      {
        sorter.shutdown();
        if (OperatingSystem.isWindows())
        {
          // Try to force the JVM to close mmap()ed file so that they can be deleted.
          System.gc();
        }
        recursiveDelete(tempDir);
      }
      logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
@@ -297,10 +300,9 @@
        return;
      }
      rootContainer.getStorage().close();
      final long availableMemory = calculateAvailableMemory();
      final int threadCount = Runtime.getRuntime().availableProcessors();
      final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
      final int bufferSize = computeBufferSize(nbBuffer, availableMemory);
      final int bufferSize = computeBufferSize(nbBuffer);
      final ExecutorService sorter = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors(),
@@ -469,13 +471,16 @@
      return tempDir;
    }
    private static int computeBufferSize(int nbBuffer, long availableMemory) throws InitializationException
    private int computeBufferSize(int nbBuffer) throws InitializationException
    {
      if (BufferPool.supportOffHeap())
      {
        return MAX_BUFFER_SIZE;
      }
      final long availableMemory = calculateAvailableMemory();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
      final int bufferSize = Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE);
      if (bufferSize < MIN_BUFFER_SIZE)
      {
@@ -1428,8 +1433,7 @@
      this.bufferPool = bufferPool;
      this.deduplicator = collector;
      this.file = new File(tempDir, name.replaceAll("\\W+", "_"));
      this.file.deleteOnExit();
      this.channel = open(this.file.toPath(), READ, WRITE, CREATE_NEW, SPARSE);
      this.channel = open(this.file.toPath(), READ, WRITE, CREATE_NEW, SPARSE, DELETE_ON_CLOSE);
      this.sorter = new ExecutorCompletionService<>(sortExecutor);
    }
@@ -1460,7 +1464,15 @@
      try
      {
        return new CollectorCursor<>(
            new CompositeCursor<>(name, waitTasksTermination(sorter, nbSortedChunks.get())), deduplicator);
            new CompositeCursor<ByteString, ByteString>(name,
                waitTasksTermination(sorter, nbSortedChunks.get()))
            {
              public void close()
              {
                super.close();
                closeSilently(channel);
              }
            }, (Collector<?, ByteString>) deduplicator);
      }
      catch (ExecutionException | InterruptedException e)
      {
@@ -2049,7 +2061,7 @@
    }
    /** Provides a globally sorted cursor from multiple sorted cursors. */
    static final class CompositeCursor<K extends Comparable<? super K>, V> implements MeteredCursor<K, V>
    static class CompositeCursor<K extends Comparable<? super K>, V> implements MeteredCursor<K, V>
    {
      /** Contains the non empty and sorted cursors ordered in regards of their current key. */
      private final NavigableSet<MeteredCursor<K, V>> orderedCursors;