mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
14.07.2016 acac2a13ebe79697f86272cacd61541955c9d343
OPENDJ-3123: Reduce memory pressure by limiting the number of thread.

When performed online, the number of processing thread used is set by
default to half the number of processors as reported by the JVM runtime.

offHeapSize argument has been removed by replacing Unsafe Buffer by
direct memory buffer. Direct memory buffer allows a try/reduce buffer
size/retry loop until we found a correct buffer size.
10 files modified
473 ■■■■ changed files
opendj-server-legacy/resource/schema/02-config.ldif 7 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java 353 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java 7 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java 19 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java 25 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/messages/org/opends/messages/backend.properties 6 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/messages/org/opends/messages/tool.properties 11 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java 40 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java 1 ●●●● patch | view | raw | blame | history
opendj-server-legacy/resource/schema/02-config.ldif
@@ -3837,12 +3837,6 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
  SINGLE-VALUE
  X-ORIGIN 'OpenDJ Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.158
  NAME 'ds-task-import-offheap-size'
  EQUALITY integerMatch
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDJ Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.160
  NAME 'ds-cfg-base-path'
  EQUALITY caseIgnoreMatch
@@ -4632,7 +4626,6 @@
        ds-task-import-is-encrypted $
        ds-task-import-backend-id $
        ds-task-import-thread-count $
        ds-task-import-offheap-size $
        ds-task-import-clear-backend )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.64
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -27,7 +27,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -94,7 +93,7 @@
import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter;
import org.opends.server.backends.pluggable.DN2ID.TreeVisitor;
import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -117,8 +116,7 @@
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
// @Checkstyle:ignore
import sun.misc.Unsafe;
import net.jcip.annotations.NotThreadSafe;
/**
 * Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is
@@ -170,41 +168,22 @@
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
    {
      final int threadCount =
          importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors()
              : importConfig.getThreadCount();
          importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
      final int indexCount = getIndexCount();
      final int nbBuffer = threadCount * indexCount * 2;
      final int bufferSize;
      if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
      {
        final long offHeapSize = importConfig.getOffHeapSize();
        bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
        if (bufferSize < MIN_BUFFER_SIZE)
        {
          // Not enough memory.
          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
        }
        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
      }
      else
      {
        bufferSize = computeBufferSize(nbBuffer);
        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      }
      final int nbRequiredBuffers = threadCount * indexCount * 2;
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
      final long startTime = System.currentTimeMillis();
      final OnDiskMergeImporter importer;
      final ExecutorService sorter = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors(),
          newThreadFactory(null, SORTER_THREAD_NAME, true));
      final ExecutorService sorter =
          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
      final LDIFReaderSource source =
          new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
      final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
      try (final Importer dbStorage = rootContainer.getStorage().startImport();
           final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
           final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
      {
        final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
        final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
@@ -241,6 +220,12 @@
          .getEntriesIgnored());
    }
    private static int getDefaultNumberOfThread()
    {
      final int nbProcessors = Runtime.getRuntime().availableProcessors();
      return Math.max(2, DirectoryServer.isRunning() ? nbProcessors / 2 : nbProcessors);
    }
    private int getIndexCount() throws ConfigException
    {
      int indexCount = 2; // dn2id, dn2uri
@@ -307,29 +292,15 @@
        return;
      }
      rootContainer.getStorage().close();
      final int threadCount = Runtime.getRuntime().availableProcessors();
      final int threadCount = getDefaultNumberOfThread();
      final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
      final int bufferSize;
      if (BufferPool.SUPPORTS_OFF_HEAP)
      {
        bufferSize = MAX_BUFFER_SIZE;
        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
            DB_CACHE_SIZE, (((long) bufferSize) * nbBuffer) / MB, nbBuffer, bufferSize / KB);
      }
      else
      {
        bufferSize = computeBufferSize(nbBuffer);
        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      }
      final ExecutorService sorter = Executors.newFixedThreadPool(
          Runtime.getRuntime().availableProcessors(),
          newThreadFactory(null, SORTER_THREAD_NAME, true));
      final ExecutorService sorter =
          Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
      final OnDiskMergeImporter importer;
      final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
      try (final Importer dbStorage = rootContainer.getStorage().startImport();
           final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
           final BufferPool bufferPool = newBufferPool(nbBuffer))
      {
        final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
            rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
@@ -349,6 +320,44 @@
      logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
    }
    public BufferPool newBufferPool(int nbBuffers) throws InitializationException
    {
      // Try off-heap direct buffer
      int bufferSize = MAX_BUFFER_SIZE;
      do
      {
        try
        {
          final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
          logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
              DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
          return pool;
        }
        catch (OutOfMemoryError e)
        {
          bufferSize /= 2;
        }
      }
      while (bufferSize > MIN_BUFFER_SIZE);
      // Off-line mode or direct memory allocation failed.
      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
      long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      if (availableMemory < minimumRequiredMemory)
      {
        // Not enough memory.
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
      }
      bufferSize = (int) (availableMemory / nbBuffers);
      if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
      {
        bufferSize = MAX_BUFFER_SIZE;
      }
      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      return new BufferPool(nbBuffers, bufferSize, false);
    }
    private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig,
        long totalEntries) throws InitializationException
    {
@@ -489,20 +498,6 @@
      return tempDir;
    }
    private int computeBufferSize(int nbBuffer) throws InitializationException
    {
      final long availableMemory = calculateAvailableHeapMemoryForBuffers();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
      final long minimumRequiredMemory = nbBuffer * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
      if (availableMemory < minimumRequiredMemory)
      {
        // Not enough memory.
        throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
      }
      return Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE);
    }
    /**
     * Calculates the amount of available memory which can be used by this import, taking into account whether
     * the import is running offline or online as a task.
@@ -519,6 +514,11 @@
      else
      {
        // Offline import/rebuild.
        // call twice gc to ensure finalizers are called
        // and young to old gen references are properly gc'd
        Runtime.getRuntime().gc();
        Runtime.getRuntime().gc();
        totalAvailableMemory = Platform.getUsableMemoryForCaching();
      }
@@ -823,7 +823,7 @@
  }
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  private static final int MAX_BUFFER_SIZE = 4 * MB;
  /** Min size of phase one buffer. */
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  /** DB cache size to use during import. */
@@ -1621,7 +1621,7 @@
    {
      private final String metricName;
      private final BufferPool bufferPool;
      private final Buffer buffer;
      private final MemoryBuffer buffer;
      private long totalBytes;
      private int indexPos;
      private int dataPos;
@@ -2727,67 +2727,26 @@
    }
  }
  /** Buffer used by {@link InMemorySortedChunk} to store and sort data. */
  interface Buffer extends Closeable
  {
    void writeInt(int position, int value);
    int readInt(int position);
    ByteString readByteString(int position, int length);
    void writeByteSequence(int position, ByteSequence data);
    int length();
    int compare(int offsetA, int lengthA, int offsetB, int lengthB);
  }
  /**
   * Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory
   * consumption and prevents the significant object allocation cost occurring for huge objects.
   */
  static final class BufferPool implements Closeable
  {
    private static final Object UNSAFE_OBJECT;
    static final boolean SUPPORTS_OFF_HEAP;
    static
    {
      Object unsafeObject = null;
      try
      {
        final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
        final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
        theUnsafeField.setAccessible(true);
        unsafeObject = theUnsafeField.get(null);
      }
      catch (Throwable e)
      {
        // Unsupported.
      }
      UNSAFE_OBJECT = unsafeObject;
      SUPPORTS_OFF_HEAP = UNSAFE_OBJECT != null;
    }
    private final BlockingQueue<MemoryBuffer> pool;
    private final BlockingQueue<Buffer> pool;
    private final int bufferSize;
    BufferPool(int nbBuffer, int bufferSize)
    BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
    {
      this.pool = new ArrayBlockingQueue<>(nbBuffer);
      this.bufferSize = bufferSize;
      for (int i = 0; i < nbBuffer; i++)
      {
        pool.offer(SUPPORTS_OFF_HEAP ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize));
        pool.offer(new MemoryBuffer(allocateDirect
                        ? ByteBuffer.allocateDirect(bufferSize)
                        : ByteBuffer.allocate(bufferSize)));
      }
    }
    public int getBufferSize()
    {
      return bufferSize;
    }
    private Buffer get()
    private MemoryBuffer get()
    {
      try
      {
@@ -2799,7 +2758,7 @@
      }
    }
    private void release(Buffer buffer)
    private void release(MemoryBuffer buffer)
    {
      try
      {
@@ -2811,183 +2770,71 @@
      }
    }
    public void setSize(int size)
    {
      while (pool.size() > size)
      {
        get();
      }
    }
    @Override
    public void close()
    {
      Buffer buffer;
      while ((buffer = pool.poll()) != null)
      {
        closeSilently(buffer);
      }
      pool.clear();
    }
    /** Off-heap buffer using Unsafe memory access. */
    @SuppressWarnings("restriction")
    static final class OffHeapBuffer implements Buffer
    {
      private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
      private static final long BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
      private final long address;
      private final int size;
      private int position;
      private final OutputStream asOutputStream = new OutputStream()
      {
        @Override
        public void write(int value) throws IOException
        {
          UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
        }
        @Override
        public void write(byte[] b) throws IOException {
            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
            position += b.length;
        }
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
            position += b.length;
        }
      };
      private boolean closed;
      OffHeapBuffer(int size)
      {
        this.size = size;
        this.address = UNSAFE.allocateMemory(size);
      }
      @Override
      public void writeInt(final int position, final int value)
      {
        UNSAFE.putInt(address + position, value);
      }
      @Override
      public int readInt(final int position)
      {
        return UNSAFE.getInt(address + position);
      }
      @Override
      public void writeByteSequence(final int position, ByteSequence data)
      {
        Reject.ifFalse(position + data.length() <= size);
        this.position = position;
        try
        {
          data.copyTo(asOutputStream);
        }
        catch(IOException e)
        {
          throw new StorageRuntimeException(e);
        }
      }
      @Override
      public int length()
      {
        return size;
      }
      @Override
      public ByteString readByteString(int position, int length)
      {
        Reject.ifFalse(position + length <= size);
        final byte[] data = new byte[length];
        UNSAFE.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length);
        return ByteString.wrap(data);
      }
      @Override
      public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
      {
        final int len = Math.min(lengthA, lengthB);
        for(int i = 0 ; i < len ; i++)
        {
          final int a = UNSAFE.getByte(address + offsetA + i) & 0xFF;
          final int b = UNSAFE.getByte(address + offsetB + i) & 0xFF;
          if ( a != b )
          {
            return a - b;
          }
        }
        return lengthA - lengthB;
      }
      @Override
      public void close()
      {
        if (!closed)
        {
          UNSAFE.freeMemory(address);
        }
        closed = true;
      }
    }
    /** Heap buffer using ByteBuffer. */
    static final class HeapBuffer implements Buffer
    /** Buffer wrapping a ByteBuffer. */
    @NotThreadSafe
    static final class MemoryBuffer
    {
      private final ByteBuffer buffer;
      HeapBuffer(int size)
      MemoryBuffer(final ByteBuffer byteBuffer)
      {
        this.buffer = ByteBuffer.allocate(size);
        this.buffer = byteBuffer;
      }
      @Override
      public void writeInt(final int position, final int value)
      void writeInt(final int position, final int value)
      {
        buffer.putInt(position, value);
      }
      @Override
      public int readInt(final int position)
      int readInt(final int position)
      {
        return buffer.getInt(position);
      }
      @Override
      public void writeByteSequence(int position, ByteSequence data)
      void writeByteSequence(int position, ByteSequence data)
      {
        buffer.position(position);
        data.copyTo(buffer);
      }
      @Override
      public int length()
      int length()
      {
        return buffer.capacity();
      }
      @Override
      public ByteString readByteString(int position, int length)
      ByteString readByteString(int position, int length)
      {
        if (buffer.hasArray())
      {
        return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
      }
      @Override
      public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
      {
        return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB));
        final byte[] data = new byte[length];
        buffer.position(position);
        buffer.get(data);
        return ByteString.wrap(data);
      }
      @Override
      public void close()
      int compare(int offsetA, int lengthA, int offsetB, int lengthB)
      {
        // Nothing to do
        int count = Math.min(lengthA, lengthB);
        int i = offsetA;
        int j = offsetB;
        while (count-- != 0)
        {
            final int firstByte = 0xFF & buffer.get(i++);
            final int secondByte = 0xFF & buffer.get(j++);
            if (firstByte != secondByte)
            {
                return firstByte - secondByte;
            }
        }
        return lengthA - lengthB;
      }
    }
  }
opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
@@ -3829,13 +3829,6 @@
       NAME_PREFIX_TASK + "import-thread-count";
  /**
   * The name of the attribute in an import task definition that specifies the
   * off-heap memory size used during the import.
   */
  public static final String ATTR_IMPORT_OFFHEAP_SIZE =
       NAME_PREFIX_TASK + "import-offheap-size";
  /**
   * The name of the attribute in an import task definition that specifies
   * whether the import process should append to the existing database rather
   * than overwriting it.
opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
@@ -93,7 +93,6 @@
  private boolean skipDNValidation;
  private String tmpDirectory;
  private int threadCount;
  private int offHeapSize;
  private String backendID;
  private String rejectFile;
  private String skipFile;
@@ -153,7 +152,6 @@
    AttributeType typeClearBackend = getSchema().getAttributeType(ATTR_IMPORT_CLEAR_BACKEND);
    AttributeType typeRandomSeed = getSchema().getAttributeType(ATTR_IMPORT_RANDOM_SEED);
    AttributeType typeThreadCount = getSchema().getAttributeType(ATTR_IMPORT_THREAD_COUNT);
    AttributeType typeOffHeapSize = getSchema().getAttributeType(ATTR_IMPORT_OFFHEAP_SIZE);
    AttributeType typeTmpDirectory = getSchema().getAttributeType(ATTR_IMPORT_TMP_DIRECTORY);
    AttributeType typeDNCheckPhase2 = getSchema().getAttributeType(ATTR_IMPORT_SKIP_DN_VALIDATION);
@@ -209,7 +207,6 @@
    clearBackend = asBoolean(taskEntry, typeClearBackend);
    randomSeed = asInt(taskEntry, typeRandomSeed);
    threadCount = asInt(taskEntry, typeThreadCount);
    offHeapSize = asInt(taskEntry, typeOffHeapSize);
    // Make sure that either the "includeBranchStrings" argument or the
    // "backendID" argument was provided.
@@ -587,7 +584,6 @@
    importConfig.setSkipDNValidation(skipDNValidation);
    importConfig.setTmpDirectory(tmpDirectory);
    importConfig.setThreadCount(threadCount);
    importConfig.setOffHeapSize(offHeapSize);
    // FIXME -- Should this be conditional?
    importConfig.setInvokeImportPlugins(true);
opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
@@ -136,7 +136,6 @@
  private StringArgument  templateFile;
  private BooleanArgument skipDNValidation;
  private IntegerArgument threadCount;
  private IntegerArgument offHeapSize;
  private StringArgument  tmpDirectory;
  private int process(String[] args, boolean initializeServer,
@@ -360,13 +359,6 @@
                      .defaultValue(0)
                      .valuePlaceholder(INFO_LDIFIMPORT_THREAD_COUNT_PLACEHOLDER.get())
                      .buildAndAddToParser(argParser);
      offHeapSize =
              IntegerArgument.builder("offHeapSize")
                      .description(INFO_LDIFIMPORT_DESCRIPTION_OFFHEAP_SIZE.get())
                      .lowerBound(0)
                      .defaultValue(700)
                      .valuePlaceholder(INFO_LDIFIMPORT_OFFHEAP_SIZE_PLACEHOLDER.get())
                      .buildAndAddToParser(argParser);
      tmpDirectory =
              StringArgument.builder("tmpdirectory")
                      .description(INFO_LDIFIMPORT_DESCRIPTION_TEMP_DIRECTORY.get())
@@ -407,7 +399,6 @@
    addAttribute(attributes, ATTR_IMPORT_TEMPLATE_FILE, templateFile.getValue());
    addAttribute(attributes, ATTR_IMPORT_RANDOM_SEED, randomSeed.getValue());
    addAttribute(attributes, ATTR_IMPORT_THREAD_COUNT, threadCount.getValue());
    addAttribute(attributes, ATTR_IMPORT_OFFHEAP_SIZE, offHeapSize.getValue());
    // Optional attributes
    addAttribute2(attributes, ATTR_IMPORT_BACKEND_ID, backendID);
@@ -794,16 +785,6 @@
        return 1;
    }
    try
    {
      importConfig.setOffHeapSize(offHeapSize.getIntValue());
    }
    catch (Exception e)
    {
      logger.error(ERR_LDIFIMPORT_CANNOT_PARSE_OFFHEAP_SIZE, offHeapSize.getValue(), e.getMessage());
      return 1;
    }
    importConfig.setBufferSize(LDIF_BUFFER_SIZE);
    importConfig.setExcludeAllUserAttributes(excludeAllUserAttributes);
    importConfig.setExcludeAllOperationalAttributes(excludeAllOperationalAttributes);
opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
@@ -125,10 +125,6 @@
  private boolean skipDNValidation;
  private int threadCount;
  /** Indicates the memory size, in megabytes, to use for off-heap buffers. */
  private int offHeapSize;
  /**
   * Creates a new LDIF import configuration that will read from the
   * specified LDIF file.
@@ -1061,27 +1057,6 @@
    return skipDNValidation;
  }
  /**
   * Set the memory size available for off-heap buffers.
   *
   * @param sizeInMb The memory size available expressed in megabytes.
   */
  public void setOffHeapSize(int sizeInMb)
  {
    this.offHeapSize = sizeInMb;
  }
  /**
   * Get the memory size available for off-heap buffers.
   *
   * @return The memory size in megabytes.
   */
  public int getOffHeapSize()
  {
    return offHeapSize;
  }
  /**
   * Set the thread count.
   *
opendj-server-legacy/src/messages/org/opends/messages/backend.properties
@@ -1002,7 +1002,7 @@
NOTE_IMPORT_LDIF_INDEX_STARTED_523=Index %s phase two started processing \
%d buffers in %d batches
NOTE_IMPORT_LDIF_PHASE_TWO_REPORT_525=Index %s %d%% complete: \
 remaining = %d kb, rate = %d kb/s; batch %d/%d
 remaining = %d KB, rate = %d KB/s; batch %d/%d
NOTE_IMPORT_LDIF_ROOTCONTAINER_CLOSE_526=Import LDIF environment close \
 took %d seconds
NOTE_IMPORT_LDIF_TOT_MEM_BUF_528=The amount of free memory available to \
@@ -1078,8 +1078,8 @@
children for DN <%s> (got %d, expecting %d)
ERR_VERIFY_ID2COUNT_WRONG_ID_597=File id2ChildrenCount references non-existing EntryID <%d>.
NOTE_REBUILD_NOTHING_TO_REBUILD_598=Rebuilding index finished: no indexes to rebuild.
NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO_599=Setting DB cache size to %d bytes. \
 Using %d Mb off-heap memory through %d phase one buffers of %d Kb.
NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO_599=Setting DB cache size to %d MB. \
 Using %d mb off-heap memory through %d phase one buffers of %d KB.
ERR_SCHEMA_PARSE_LINE_600=Ignoring schema definition '%s' because the following error occurred while \
  it was being parsed: %s
ERR_SCHEMA_COULD_NOT_PARSE_DEFINITION_601=Schema definition could not be parsed as valid attribute value
opendj-server-legacy/src/messages/org/opends/messages/tool.properties
@@ -2453,14 +2453,9 @@
INFO_INDEX_NAME_PLACEHOLDER_1894={indexName}
INFO_DESCRIPTION_BACKEND_DEBUG_RAW_DB_NAME_1895=The raw database name
INFO_CHANGE_NUMBER_PLACEHOLDER_1896={change number}
ERR_LDIFIMPORT_CANNOT_PARSE_OFFHEAP_SIZE_1897=The value %s for \
offHeapSize cannot be parsed: %s
INFO_LDIFIMPORT_DESCRIPTION_OFFHEAP_SIZE_1898=Size expressed in megabytes of the off-heap memory dedicated to the \
phase one buffers.
INFO_LDIFIMPORT_OFFHEAP_SIZE_PLACEHOLDER_1899={size in megabytes}
ERR_CANNOT_INITIALIZE_BACKENDS_1900=An error occurred while initializing server backends: %s
ERR_CANNOT_INITIALIZE_SERVER_PLUGINS_1901=An error occurred while initializing plugins: %s
ERR_CANNOT_SUBSYSTEM_NOT_INITIALIZED_1902=Subsystem %s should be initialized first
ERR_CANNOT_INITIALIZE_BACKENDS_1897=An error occurred while initializing server backends: %s
ERR_CANNOT_INITIALIZE_SERVER_PLUGINS_1898=An error occurred while initializing plugins: %s
ERR_CANNOT_SUBSYSTEM_NOT_INITIALIZED_1899=Subsystem %s should be initialized first
# Upgrade tasks
INFO_UPGRADE_TASK_6869_SUMMARY_10000=Fixing de-DE collation matching rule OID
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -19,7 +19,7 @@
import static org.opends.server.backends.pluggable.EntryIDSet.*;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
@@ -42,10 +42,8 @@
import org.mockito.Mockito;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Buffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.HeapBuffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.OffHeapBuffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.EntryIDSetsCollector;
@@ -69,35 +67,29 @@
public class OnDiskMergeImporterTest extends DirectoryServerTestCase
{
  @Test
  public void testHeapBuffer() throws IOException
  public void testHeapBuffer()
  {
    try(Buffer buffer = new HeapBuffer(1024))
    {
      testBufferImplementation(buffer);
    }
    testBufferImplementation(new MemoryBuffer(ByteBuffer.allocate(1024)));
  }
  @Test
  public void testOffHeapBuffer() throws IOException
  public void testOffHeapBuffer()
  {
    if (BufferPool.SUPPORTS_OFF_HEAP)
    {
      try (Buffer buffer = new OffHeapBuffer(1024))
      {
        testBufferImplementation(buffer);
      }
    }
    testBufferImplementation(new MemoryBuffer(ByteBuffer.allocateDirect(1024)));
  }
  private static void testBufferImplementation(Buffer buffer)
  private static void testBufferImplementation(MemoryBuffer buffer)
  {
    final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4 });
    final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4, 1 });
    buffer.writeByteSequence(0, binary);
    buffer.writeInt(4, 1234);
    buffer.writeInt(5, 1234);
    assertThat(buffer.readByteString(0, 4)).isEqualTo(binary);
    assertThat(buffer.readInt(4)).isEqualTo(1234);
    assertThat(buffer.readByteString(0, 5)).isEqualTo(binary);
    assertThat(buffer.readInt(5)).isEqualTo(1234);
    assertThat(buffer.compare(0, 1, 2, 1)).isLessThan(0);
    assertThat(buffer.compare(0, 1, 4, 1)).isEqualTo(0);
    assertThat(buffer.compare(1, 1, 0, 1)).isGreaterThan(0);
  }
  @Test
@@ -239,7 +231,7 @@
  @Test
  public void testInMemorySortedChunkSortUnsignedOnFlip() throws Exception
  {
    try(final BufferPool bufferPool = new BufferPool(1, 1024)) {
    try(final BufferPool bufferPool = new BufferPool(1, 1024, false)) {
      final Chunk chunk = new InMemorySortedChunk("test", bufferPool);
      populate(chunk, content(new String[][] {
        { new String(new byte[] { (byte) 0xFF }), "value0xFF" },
@@ -313,7 +305,7 @@
    final int NB_REGION = 10;
    final ByteString KEY = ByteString.valueOfUtf8("key");
    final File tempDir = TestCaseUtils.createTemporaryDirectory("testExternalSortChunk");
    try (final BufferPool bufferPool = new BufferPool(2, 4 + 4 + KEY.length() + 4 + 4))
    try (final BufferPool bufferPool = new BufferPool(2, 4 + 4 + KEY.length() + 4 + 4, false))
    {
      // 4: record offset, 4: key length, 4: value length, 4: value
      final ExternalSortChunk chunk =
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
@@ -955,7 +955,6 @@
      importConf.setIncludeBranches(Collections.singleton(testBaseDN));
      importConf.setSkipDNValidation(true);
      importConf.setThreadCount(0);
      importConf.setOffHeapSize(0); // Force heap buffer for automatic buffer scaling.
      backend.importLDIF(importConf, DirectoryServer.getInstance().getServerContext());
    }
    assertEquals(rejectedEntries.size(), 0,