mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
27.31.2016 134f26626acf0f76f0e999875f3c5a90c7b16034
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -42,6 +42,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
@@ -121,13 +124,11 @@
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.util.Platform;
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
@@ -169,13 +170,11 @@
    /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
    private static final int SMALL_HEAP_SIZE = 256 * MB;
    private final ServerContext serverContext;
    private final RootContainer rootContainer;
    private final PluggableBackendCfg backendCfg;
    StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
    StrategyImpl(RootContainer rootContainer, PluggableBackendCfg backendCfg)
    {
      this.serverContext = serverContext;
      this.rootContainer = rootContainer;
      this.backendCfg = backendCfg;
    }
@@ -337,40 +336,43 @@
      logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
    }
    public BufferPool newBufferPool(int nbBuffers) throws InitializationException
    public BufferPool newBufferPool(final int nbBuffers) throws InitializationException
    {
      // Try off-heap direct buffer
      int bufferSize = MAX_BUFFER_SIZE;
      do
      final Long offheapMemorySize = backendCfg.getImportOffheapMemorySize();
      final long offHeapMemoryAvailable = offheapMemorySize != null ? offheapMemorySize : 0;
      if (offHeapMemoryAvailable > 0)
      {
        try
        // Try off-heap direct buffer
        logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, offHeapMemoryAvailable, nbBuffers);
        int bufferSize = (int) (offHeapMemoryAvailable / nbBuffers);
        while (bufferSize > MIN_BUFFER_SIZE)
        {
          final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
              DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
          return pool;
        }
        catch (OutOfMemoryError e)
        {
          bufferSize /= 2;
          try
          {
            final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
            final long usedOffHeapMemory = (((long) bufferSize) * nbBuffers) / MB;
            logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
                DB_CACHE_SIZE / MB, usedOffHeapMemory, nbBuffers, bufferSize / KB);
            return pool;
          }
          catch (OutOfMemoryError e)
          {
            bufferSize /= 2;
          }
        }
      }
      while (bufferSize > MIN_BUFFER_SIZE);
      // Off-line mode or direct memory allocation failed.
      // Off-heap memory allocation has failed or is disabled.
      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
      long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      final long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      if (availableMemory < minimumRequiredMemory)
      {
        // Not enough memory.
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
      }
      bufferSize = (int) (availableMemory / nbBuffers);
      if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
      {
        bufferSize = MAX_BUFFER_SIZE;
      }
      final long buffersMemory = availableMemory - DB_CACHE_SIZE - REQUIRED_FREE_MEMORY;
      final int bufferSize = Math.min(((int) (buffersMemory / nbBuffers)), MAX_BUFFER_SIZE);
      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      return new BufferPool(nbBuffers, bufferSize, false);
    }
@@ -477,7 +479,6 @@
      throw new InitializationException(ERR_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(name));
    }
    private static Collection<String> getIndexNames(IndexType indexType, AttributeIndex attrIndex)
    {
      if (indexType.equals(IndexType.PRESENCE))
@@ -521,35 +522,41 @@
     */
    private long calculateAvailableHeapMemoryForBuffers()
    {
      final long totalAvailableMemory;
      if (DirectoryServer.isRunning())
      {
        // Online import/rebuild.
        final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
        totalAvailableMemory = Math.max(availableMemory, 16 * MB);
      }
      else
      {
        // Offline import/rebuild.
        // call twice gc to ensure finalizers are called
        // and young to old gen references are properly gc'd
        Runtime.getRuntime().gc();
        Runtime.getRuntime().gc();
        totalAvailableMemory = Platform.getUsableMemoryForCaching();
      }
      // Now take into account various fudge factors.
      int importMemPct = 90;
      final long totalAvailableMemory = getAvailableMemoryAfterGC();
      int importMemPct = 100;
      if (totalAvailableMemory <= SMALL_HEAP_SIZE)
      {
        // Be pessimistic when memory is low.
        importMemPct -= 25;
        importMemPct -= 65;
      }
      return (totalAvailableMemory * importMemPct / 100);
    }
  }
  private static long getAvailableMemoryAfterGC()
  {
    final Runtime runTime = Runtime.getRuntime();
    runTime.gc();
    runTime.gc();
    // First try calculation based on oldgen size
    final List<MemoryPoolMXBean> mpools = ManagementFactory.getMemoryPoolMXBeans();
    for (final MemoryPoolMXBean mpool : mpools)
    {
      final MemoryUsage usage = mpool.getUsage();
      if (usage != null && mpool.getName().endsWith("Old Gen") && usage.getMax() > 0)
      {
        final long max = usage.getMax();
        final long hardLimit = (long) (max * 0.90);
        final long softLimit = (long) (max * 0.69);
        final long used = usage.getUsed();
        return (softLimit > used) ? (softLimit - used) : Math.max(0, hardLimit - used);
      }
    }
    // Fall back to 40% of overall heap size (no need to do gc() again).
    return (runTime.freeMemory() + (runTime.maxMemory() - runTime.totalMemory())) * 40 / 100;
  }
  /** Source of LDAP {@link Entry}s to process. */
  private interface Source
  {