From e3a3030cd14ba12631b8c50d955ec800b247fb72 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 10 Mar 2016 13:24:01 +0000
Subject: [PATCH] OPENDJ-2727: Low performance during import with large index-entry-limit

---
 opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java                     |    9 
 opendj-server-legacy/src/messages/org/opends/messages/backend.properties                             |    3 
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java     |  365 ++++++++++++++++++++++------------------
 opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java                           |   65 ++++--
 opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java |   12 
 opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java                     |   23 ++
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java             |    4 
 opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java                           |    4 
 opendj-server-legacy/resource/schema/02-config.ldif                                                  |    7 
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java              |   26 ++
 opendj-server-legacy/src/messages/org/opends/messages/tool.properties                                |    5 
 11 files changed, 325 insertions(+), 198 deletions(-)

diff --git a/opendj-server-legacy/resource/schema/02-config.ldif b/opendj-server-legacy/resource/schema/02-config.ldif
index a83b787..96051fb 100644
--- a/opendj-server-legacy/resource/schema/02-config.ldif
+++ b/opendj-server-legacy/resource/schema/02-config.ldif
@@ -3830,6 +3830,12 @@
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
   SINGLE-VALUE
   X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.159
+  NAME 'ds-task-import-offheap-size'
+  EQUALITY integerMatch
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDJ Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler'
   SUP top
@@ -4517,6 +4523,7 @@
         ds-task-import-is-encrypted $
         ds-task-import-backend-id $
         ds-task-import-thread-count $
+        ds-task-import-offheap-size $
         ds-task-import-clear-backend )
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.64
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
index 6a40020..62f7ff1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2014-2015 ForgeRock AS.
+ * Portions Copyright 2014-2016 ForgeRock AS.
  */
 package org.opends.server.backends.pluggable;
 
@@ -696,6 +696,30 @@
   }
 
   /**
+   * Copy the list of IDs contained in this {@link EntryIDSet} into the given
+   * array. This function does nothing if this {@link EntryIDSet} is not
+   * defined.
+   *
+   * @param array
+   *          Array where to copy this {@link EntryIDSet} entries id.
+   * @param offset
+   *          The offset within the array of the first byte to be written; must
+   *          be non-negative and no larger than array.length.
+   * @return The number of elements copied or -1 if this {@link EntryIDSet} is
+   *         not defined.
+   */
+  public int copyTo(long[] array, int offset)
+  {
+    if (isDefined())
+    {
+      final long[] ids = concreteImpl.getIDs();
+      System.arraycopy(ids, 0, array, offset, ids.length);
+      return ids.length;
+    }
+    return -1;
+  }
+
+  /**
    * Returns this {@link EntryIDSet} as a long array holding all the entryIDs.
    *
    * @return a new long array containing all the entryIDs included in this {@link EntryIDSet}.
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
index 12271cc..ab2b865 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2014-2015 ForgeRock AS.
+ * Portions Copyright 2014-2016 ForgeRock AS.
  */
 package org.opends.server.backends.pluggable;
 
@@ -278,7 +278,7 @@
     {
       this.txn = txn;
       this.expectedEntryID = expectedEntryID;
-      this.encodedEntryID = CODEC_V2.encode(EntryIDSet.newDefinedSet(expectedEntryID.longValue()));
+      this.encodedEntryID = ByteString.valueOfLong(expectedEntryID.longValue());
     }
 
     @Override
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index c03316b..b3c6c31 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -176,8 +176,23 @@
       final int indexCount = getIndexCount();
 
       final int nbBuffer = threadCount * indexCount * 2;
-      final int bufferSize = computeBufferSize(nbBuffer);
-      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
+      final int bufferSize;
+      if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
+      {
+        final long offHeapSize = importConfig.getOffHeapSize();
+        bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
+        if (bufferSize < MIN_BUFFER_SIZE)
+        {
+          // Not enough memory.
+          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
+        }
+        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
+      }
+      else
+      {
+        bufferSize = computeBufferSize(nbBuffer);
+        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
+      }
       logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
       logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
 
@@ -466,11 +481,6 @@
 
     private int computeBufferSize(int nbBuffer) throws InitializationException
     {
-      if (BufferPool.SUPPORTS_OFF_HEAP)
-      {
-        return MAX_BUFFER_SIZE;
-      }
-
       final long availableMemory = calculateAvailableMemory();
       logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
 
@@ -809,12 +819,14 @@
     }
   }
 
+  /** Default size for the off-heap memory dedicated to the phase one's buffer. */
+  private static final int DEFAULT_OFFHEAP_SIZE = 700;
   /** Max size of phase one buffer. */
   private static final int MAX_BUFFER_SIZE = 2 * MB;
   /** Min size of phase one buffer. */
   private static final int MIN_BUFFER_SIZE = 4 * KB;
   /** DB cache size to use during import. */
-  private static final int DB_CACHE_SIZE = 4 * MB;
+  private static final int DB_CACHE_SIZE = 32 * MB;
   /** Required free memory for this importer. */
   private static final int REQUIRED_FREE_MEMORY = 50 * MB;
   /** LDIF reader. */
@@ -855,7 +867,7 @@
           {
             try
             {
-              importStrategy.beforeImport(container);
+              importStrategy.beforePhaseOne(container);
             }
             finally
             {
@@ -878,6 +890,8 @@
       throw new InterruptedException("Import processing canceled.");
     }
 
+    importStrategy.afterPhaseOne();
+
     // Start phase two
     final long phaseTwoStartTime = System.currentTimeMillis();
     try (final PhaseTwoProgressReporter progressReporter = new PhaseTwoProgressReporter())
@@ -895,7 +909,7 @@
     // Finish import
     for(EntryContainer entryContainer : importedContainers.keySet())
     {
-      importStrategy.afterImport(entryContainer);
+      importStrategy.afterPhaseTwo(entryContainer);
     }
     phaseTwoTimeMs = System.currentTimeMillis() - phaseTwoStartTime;
   }
@@ -951,15 +965,20 @@
 
     abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
 
-    void beforeImport(EntryContainer entryContainer)
+    void beforePhaseOne(EntryContainer entryContainer)
     {
       entryContainer.delete(asWriteableTransaction(importer));
       visitIndexes(entryContainer, setTrust(false, importer));
     }
 
+    void afterPhaseOne()
+    {
+      closeSilently(bufferPool);
+    }
+
     abstract Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk source, PhaseTwoProgressReporter progressReporter);
 
-    void afterImport(EntryContainer entryContainer)
+    void afterPhaseTwo(EntryContainer entryContainer)
     {
       visitIndexes(entryContainer, setTrust(true, importer));
     }
@@ -967,7 +986,8 @@
     final Chunk newExternalSortChunk(TreeName treeName) throws Exception
     {
       return new ExternalSortChunk(tempDir, treeName.toString(), bufferPool,
-          newCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
+          newPhaseOneCollector(entryContainers.get(treeName.getBaseDN()), treeName),
+          newPhaseTwoCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
     }
 
     final Callable<Void> newChunkCopierTask(TreeName treeName, final Chunk source,
@@ -983,7 +1003,7 @@
       final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
 
       return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source,
-          id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
+          id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
     }
 
     final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source,
@@ -1167,14 +1187,14 @@
     }
 
     @Override
-    void beforeImport(EntryContainer entryContainer)
+    void beforePhaseOne(EntryContainer entryContainer)
     {
       visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, setTrust(false, importer)));
       visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, deleteDatabase(importer)));
     }
 
     @Override
-    void afterImport(EntryContainer entryContainer)
+    void afterPhaseTwo(EntryContainer entryContainer)
     {
       visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, setTrust(true, importer)));
     }
@@ -1395,12 +1415,12 @@
     /** Provides buffer used to store and sort chunk of data. */
     private final BufferPool bufferPool;
     /** File containing the regions used to store the data. */
-    private final File file;
     private final FileChannel channel;
     /** Pointer to the next available region in the file, typically at end of file. */
     private final AtomicLong filePosition = new AtomicLong();
     /** Collector used to reduces the number of duplicate keys during sort. */
-    private final Collector<?, ByteString> deduplicator;
+    private final Collector<?, ByteString> phaseOneDeduplicator;
+    private final Collector<?, ByteString> phaseTwoDeduplicator;
     /** Keep track of pending sorting tasks. */
     private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
     /** Keep track of currently opened chunks. */
@@ -1419,14 +1439,15 @@
       }
     };
 
-    ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector,
-        Executor sortExecutor) throws IOException
+    ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> phaseOneDeduplicator,
+        Collector<?, ByteString> phaseTwoDeduplicator, Executor sortExecutor) throws IOException
     {
       this.name = name;
       this.bufferPool = bufferPool;
-      this.deduplicator = collector;
-      this.file = new File(tempDir, name.replaceAll("\\W+", "_") + "_" + UUID.randomUUID().toString());
-      this.channel = open(this.file.toPath(), CREATE, TRUNCATE_EXISTING, READ, WRITE, DELETE_ON_CLOSE);
+      this.phaseOneDeduplicator = phaseOneDeduplicator;
+      this.phaseTwoDeduplicator = phaseTwoDeduplicator;
+      final File file = new File(tempDir, name.replaceAll("\\W+", "_") + "_" + UUID.randomUUID().toString());
+      this.channel = open(file.toPath(), CREATE_NEW, SPARSE, READ, WRITE);
       this.sorter = new ExecutorCompletionService<>(sortExecutor);
     }
 
@@ -1478,7 +1499,7 @@
                 }
                 closeSilently(channel);
               }
-            }, (Collector<?, ByteString>) deduplicator);
+            }, (Collector<?, ByteString>) phaseTwoDeduplicator);
       }
       catch (ExecutionException | InterruptedException e)
       {
@@ -1520,7 +1541,7 @@
            */
           final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
           try (final SequentialCursor<ByteString, ByteString> source =
-              new CollectorCursor<>(chunk.flip(), deduplicator))
+              new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
           {
             copyIntoChunk(source, persistentChunk);
           }
@@ -1553,8 +1574,6 @@
      */
     static final class InMemorySortedChunk implements Chunk, Comparator<Integer>
     {
-      private static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
-
       private final String metricName;
       private final BufferPool bufferPool;
       private final Buffer buffer;
@@ -1574,13 +1593,11 @@
       @Override
       public boolean put(ByteSequence key, ByteSequence value)
       {
-        final int keyHeaderSize = PackedLong.getEncodedSize(key.length());
-        final int valueHeaderSize = PackedLong.getEncodedSize(value.length());
-        final int keyRecordSize = keyHeaderSize + key.length();
-        final int recordSize = keyRecordSize + valueHeaderSize + value.length();
+        final int keyRecordSize = INT_SIZE + key.length();
+        final int recordSize = keyRecordSize + INT_SIZE + value.length();
 
         dataPos -= recordSize;
-        final int recordDataPos = dataPos;
+        int recordDataPos = dataPos;
 
         final int recordIndexPos = indexPos;
         indexPos += INT_SIZE;
@@ -1596,19 +1613,18 @@
 
         // Write record offset
         buffer.writeInt(recordIndexPos, recordDataPos);
-        final int valuePos = writeDataAt(recordDataPos, key);
-        writeDataAt(valuePos, value);
+
+        buffer.writeInt(recordDataPos, key.length());
+        recordDataPos += INT_SIZE;
+        buffer.writeInt(recordDataPos, value.length());
+        recordDataPos += INT_SIZE;
+        buffer.writeByteSequence(recordDataPos, key);
+        recordDataPos += key.length();
+        buffer.writeByteSequence(recordDataPos, value);
 
         return true;
       }
 
-      private int writeDataAt(int offset, ByteSequence data)
-      {
-        final int headerSize = buffer.writeCompactUnsignedLong(offset, data.length());
-        buffer.writeByteSequence(offset + headerSize, data);
-        return offset + headerSize + data.length();
-      }
-
       @Override
       public long size()
       {
@@ -1660,11 +1676,10 @@
           return 0;
         }
         // Compare Keys
-        final int keyLengthA = (int) buffer.readCompactUnsignedLong(iOffsetA);
-        final int keyOffsetA = iOffsetA + PackedLong.getEncodedSize(keyLengthA);
-
-        final int keyLengthB = (int) buffer.readCompactUnsignedLong(iOffsetB);
-        final int keyOffsetB = iOffsetB + PackedLong.getEncodedSize(keyLengthB);
+        final int keyLengthA = buffer.readInt(iOffsetA);
+        final int keyLengthB = buffer.readInt(iOffsetB);
+        final int keyOffsetA = iOffsetA + 2 * INT_SIZE;
+        final int keyOffsetB = iOffsetB + 2 * INT_SIZE;
 
         return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB);
       }
@@ -1685,19 +1700,19 @@
             key = value = null;
             return false;
           }
-          final int recordOffset = buffer.readInt(indexOffset);
+          int recordOffset = buffer.readInt(indexOffset);
 
-          final int keyLength = (int) buffer.readCompactUnsignedLong(recordOffset);
-          final int keyHeaderSize = PackedLong.getEncodedSize(keyLength);
-          key = buffer.readByteString(recordOffset + keyHeaderSize, keyLength);
+          final int keyLength = buffer.readInt(recordOffset);
+          recordOffset += 4;
+          final int valueLength = buffer.readInt(recordOffset);
+          recordOffset += 4;
 
-          final int valueOffset = recordOffset + keyHeaderSize + keyLength;
-          final int valueLength = (int) buffer.readCompactUnsignedLong(valueOffset);
-          final int valueHeaderSize = PackedLong.getEncodedSize(valueLength);
-          value = buffer.readByteString(valueOffset + valueHeaderSize, valueLength);
+          key = buffer.readByteString(recordOffset, keyLength);
+          recordOffset += key.length();
+          value = buffer.readByteString(recordOffset, valueLength);
 
           indexOffset += INT_SIZE;
-          bytesRead += keyHeaderSize + keyLength + valueHeaderSize + valueLength;
+          bytesRead += (2 * INT_SIZE) + keyLength + valueLength;
 
           return true;
         }
@@ -1833,11 +1848,6 @@
       public MeteredCursor<ByteString, ByteString> flip()
       {
         size = mmapBuffer.position();
-        /*
-         * We force OS to write dirty pages now so that they don't accumulate. Indeed, huge number of dirty pages might
-         * cause the OS to freeze the producer of those dirty pages (this importer) while it is swapping-out the pages.
-         */
-        mmapBuffer.force();
         mmapBuffer = null;
         return new FileRegionChunkCursor(startOffset, size);
       }
@@ -2297,7 +2307,7 @@
     {
       final Chunk id2CountChunk =
           new ExternalSortChunk(tempDir, id2count.getName().toString(), bufferPool, id2countCollector,
-              sameThreadExecutor());
+              id2countCollector, sameThreadExecutor());
       long totalNumberOfEntries = 0;
 
       final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
@@ -2471,7 +2481,13 @@
      * threads which will access the put() method. If underestimated, {@link #put(ByteSequence, ByteSequence)} might
      * lead to unordered copy. If overestimated, extra memory is wasted.
      */
-    private static final int QUEUE_SIZE = 1024;
+    private static final int QUEUE_SIZE = 128;
+
+    /**
+     * Maximum queued entry size. Beyond this size, entry will not be queued but written directly to the storage in
+     * order to limit the heap size requirement for import.
+     */
+    private static final int ENTRY_MAX_SIZE = 32 * KB;
 
     private final NavigableMap<ByteSequence, ByteSequence> pendingRecords = new TreeMap<>();
     private final int queueSize;
@@ -2486,6 +2502,11 @@
     @Override
     public synchronized boolean put(ByteSequence key, ByteSequence value)
     {
+      if ((key.length() + value.length()) >= ENTRY_MAX_SIZE)
+      {
+          return delegate.put(key, value);
+      }
+
       pendingRecords.put(key, value);
       if (pendingRecords.size() == queueSize)
       {
@@ -2700,12 +2721,8 @@
 
     int readInt(int position);
 
-    long readCompactUnsignedLong(int position);
-
     ByteString readByteString(int position, int length);
 
-    int writeCompactUnsignedLong(int position, long value);
-
     void writeByteSequence(int position, ByteSequence data);
 
     int length();
@@ -2808,14 +2825,6 @@
       private final long address;
       private final int size;
       private int position;
-      private final InputStream asInputStream = new InputStream()
-      {
-        @Override
-        public int read() throws IOException
-        {
-          return UNSAFE.getByte(address + position++) & 0xFF;
-        }
-      };
       private final OutputStream asOutputStream = new OutputStream()
       {
         @Override
@@ -2823,6 +2832,18 @@
         {
           UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
         }
+
+        @Override
+        public void write(byte[] b) throws IOException {
+            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
+            position += b.length;
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
+            position += b.length;
+        }
       };
       private boolean closed;
 
@@ -2845,42 +2866,17 @@
       }
 
       @Override
-      public int writeCompactUnsignedLong(final int position, long value)
+      public void writeByteSequence(final int position, ByteSequence data)
       {
-        try
-        {
-          this.position = position;
-          return PackedLong.writeCompactUnsigned(asOutputStream, value);
-        }
-        catch (IOException e)
-        {
-          throw new StorageRuntimeException(e);
-        }
-      }
-
-      @Override
-      public long readCompactUnsignedLong(final int position)
-      {
+        Reject.ifFalse(position + data.length() <= size);
         this.position = position;
         try
         {
-          return PackedLong.readCompactUnsignedLong(asInputStream);
+          data.copyTo(asOutputStream);
         }
-        catch (IOException e)
+        catch(IOException e)
         {
-          throw new IllegalStateException(e);
-        }
-      }
-
-      @Override
-      public void writeByteSequence(int position, ByteSequence data)
-      {
-        Reject.ifFalse(position + data.length() <= size);
-
-        long offset = address + position;
-        for(int i = 0 ; i < data.length() ; i++)
-        {
-          UNSAFE.putByte(offset++, data.byteAt(i));
+          throw new StorageRuntimeException(e);
         }
       }
 
@@ -2931,23 +2927,6 @@
     static final class HeapBuffer implements Buffer
     {
       private final ByteBuffer buffer;
-      private final OutputStream asOutputStream = new OutputStream()
-      {
-
-        @Override
-        public void write(int b) throws IOException
-        {
-          buffer.put((byte) (b & 0xFF));
-        }
-      };
-      private final InputStream asInputStream = new InputStream()
-      {
-        @Override
-        public int read() throws IOException
-        {
-          return buffer.get() & 0xFF;
-        }
-      };
 
       HeapBuffer(int size)
       {
@@ -2967,34 +2946,6 @@
       }
 
       @Override
-      public int writeCompactUnsignedLong(final int position, long value)
-      {
-        buffer.position(position);
-        try
-        {
-          return PackedLong.writeCompactUnsigned(asOutputStream, value);
-        }
-        catch (IOException e)
-        {
-          throw new StorageRuntimeException(e);
-        }
-      }
-
-      @Override
-      public long readCompactUnsignedLong(final int position)
-      {
-        buffer.position(position);
-        try
-        {
-          return PackedLong.readCompactUnsignedLong(asInputStream);
-        }
-        catch (IOException e)
-        {
-          throw new IllegalArgumentException(e);
-        }
-      }
-
-      @Override
       public void writeByteSequence(int position, ByteSequence data)
       {
         buffer.position(position);
@@ -3065,7 +3016,8 @@
    * Get a new {@link Collector} which can be used to merge encoded values. The types of values to merged is deduced
    * from the {@link TreeName}
    */
-  private static Collector<?, ByteString> newCollector(final EntryContainer entryContainer, final TreeName treeName)
+  private static Collector<?, ByteString> newPhaseTwoCollector(final EntryContainer entryContainer,
+      final TreeName treeName)
   {
     final DefaultIndex index = getIndex(entryContainer, treeName);
     if (index != null)
@@ -3086,6 +3038,18 @@
     throw new IllegalArgumentException("Unknown tree: " + treeName);
   }
 
+  private static Collector<?, ByteString> newPhaseOneCollector(final EntryContainer entryContainer,
+      final TreeName treeName)
+  {
+    final DefaultIndex index = getIndex(entryContainer, treeName);
+    if (index != null)
+    {
+      // key conflicts == merge EntryIDSets
+      return new EntryIDsCollector(index);
+    }
+    return newPhaseTwoCollector(entryContainer, treeName);
+  }
+
   private static boolean isDN2ID(TreeName treeName)
   {
     return SuffixContainer.DN2ID_INDEX_NAME.equals(treeName.getIndexId());
@@ -3212,6 +3176,80 @@
   }
 
   /**
+   * {@link Collector} that accepts encoded {@link EntryIDSet} objects and
+   * produces a {@link ByteString} representing the merged {@link EntryIDSet}.
+   */
+  static final class EntryIDsCollector implements Collector<LongArray, ByteString>
+  {
+    private final DefaultIndex index;
+    private final int indexLimit;
+
+    EntryIDsCollector(DefaultIndex index)
+    {
+      this.index = index;
+      this.indexLimit = index.getIndexEntryLimit();
+    }
+
+    @Override
+    public LongArray get()
+    {
+      return new LongArray();
+    }
+
+    @Override
+    public LongArray accept(LongArray resultContainer, ByteString value)
+    {
+      if (resultContainer.size() < indexLimit)
+      {
+        resultContainer.add(value.toLong());
+      }
+      /*
+       * else EntryIDSet is above index entry limits, discard additional values
+       * to avoid blowing up memory now, then discard all entries in merge()
+       */
+      return resultContainer;
+    }
+
+    @Override
+    public ByteString merge(LongArray resultContainer)
+    {
+      if (resultContainer.size() >= indexLimit)
+      {
+        return index.toValue(EntryIDSet.newUndefinedSet());
+      }
+      return index.toValue(EntryIDSet.newDefinedSet(resultContainer.get()));
+    }
+  }
+
+  /** Simple long array primitive wrapper. */
+  private static final class LongArray
+  {
+    private long[] values = new long[16];
+    private int size;
+
+    void add(long value)
+    {
+      if (size == values.length)
+      {
+        values = Arrays.copyOf(values, values.length * 2);
+      }
+      values[size++] = value;
+    }
+
+    int size()
+    {
+      return size;
+    }
+
+    long[] get()
+    {
+      values = Arrays.copyOf(values, size);
+      Arrays.sort(values);
+      return values;
+    }
+  }
+
+  /**
    * {@link Collector} that accepts encoded {@link EntryIDSet} objects and produces a {@link ByteString} representing
    * the merged {@link EntryIDSet}.
    */
@@ -3264,27 +3302,26 @@
 
     private EntryIDSet buildEntryIDSet(Collection<ByteString> encodedIDSets)
     {
-      final long[] entryIDs = new long[indexLimit];
-
-      // accumulate in array
-      int i = 0;
-      for (ByteString encodedIDSet : encodedIDSets)
-      {
+      final List<EntryIDSet> idSets = new ArrayList<>(encodedIDSets.size());
+      int mergedSize = 0;
+      for(ByteString encodedIDSet :encodedIDSets) {
         final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), encodedIDSet);
-        if (!entryIDSet.isDefined() || i + entryIDSet.size() >= indexLimit)
+        mergedSize += entryIDSet.size();
+        if (!entryIDSet.isDefined() || mergedSize >= indexLimit)
         {
           // above index entry limit
           return EntryIDSet.newUndefinedSet();
         }
-
-        for (EntryID entryID : entryIDSet)
-        {
-          entryIDs[i++] = entryID.longValue();
-        }
+        idSets.add(entryIDSet);
       }
 
-      Arrays.sort(entryIDs, 0, i);
-      return EntryIDSet.newDefinedSet(Arrays.copyOf(entryIDs, i));
+      final long[] entryIDs = new long[mergedSize];
+      int offset = 0;
+      for(EntryIDSet idSet : idSets) {
+        offset += idSet.copyTo(entryIDs, offset);
+      }
+      Arrays.sort(entryIDs);
+      return EntryIDSet.newDefinedSet(entryIDs);
     }
   }
 
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java b/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
index 69e898c..736660f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2015 ForgeRock AS.
+ * Portions copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.config;
 
@@ -3829,6 +3829,13 @@
        NAME_PREFIX_TASK + "import-thread-count";
 
   /**
+   * The name of the attribute in an import task definition that specifies the
+   * off-heap memory size used during the import.
+   */
+  public static final String ATTR_IMPORT_OFFHEAP_SIZE =
+       NAME_PREFIX_TASK + "import-offheap-size";
+
+  /**
    * The name of the attribute in an import task definition that specifies
    * whether the import process should append to the existing database rather
    * than overwriting it.
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java b/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
index d55a6eb..04da0fb 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
@@ -95,6 +95,7 @@
   private boolean skipDNValidation;
   private String tmpDirectory;
   private int threadCount;
+  private int offHeapSize;
   private String backendID;
   private String rejectFile;
   private String skipFile;
@@ -155,6 +156,7 @@
     AttributeType typeClearBackend = getAttributeType(ATTR_IMPORT_CLEAR_BACKEND);
     AttributeType typeRandomSeed = getAttributeType(ATTR_IMPORT_RANDOM_SEED);
     AttributeType typeThreadCount = getAttributeType(ATTR_IMPORT_THREAD_COUNT);
+    AttributeType typeOffHeapSize = getAttributeType(ATTR_IMPORT_OFFHEAP_SIZE);
     AttributeType typeTmpDirectory = getAttributeType(ATTR_IMPORT_TMP_DIRECTORY);
     AttributeType typeDNCheckPhase2 = getAttributeType(ATTR_IMPORT_SKIP_DN_VALIDATION);
 
@@ -210,6 +212,7 @@
     clearBackend = asBoolean(taskEntry, typeClearBackend);
     randomSeed = asInt(taskEntry, typeRandomSeed);
     threadCount = asInt(taskEntry, typeThreadCount);
+    offHeapSize = asInt(taskEntry, typeOffHeapSize);
 
     // Make sure that either the "includeBranchStrings" argument or the
     // "backendID" argument was provided.
@@ -590,6 +593,7 @@
     importConfig.setSkipDNValidation(skipDNValidation);
     importConfig.setTmpDirectory(tmpDirectory);
     importConfig.setThreadCount(threadCount);
+    importConfig.setOffHeapSize(offHeapSize);
 
     // FIXME -- Should this be conditional?
     importConfig.setInvokeImportPlugins(true);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java b/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
index db7d499..3f6132e 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
@@ -161,6 +161,7 @@
   private StringArgument  templateFile;
   private BooleanArgument skipDNValidation;
   private IntegerArgument threadCount;
+  private IntegerArgument offHeapSize;
   private StringArgument  tmpDirectory;
 
   private int process(String[] args, boolean initializeServer,
@@ -397,6 +398,13 @@
                       .defaultValue(0)
                       .valuePlaceholder(INFO_LDIFIMPORT_THREAD_COUNT_PLACEHOLDER.get())
                       .buildAndAddToParser(argParser);
+      offHeapSize =
+              IntegerArgument.builder("offHeapSize")
+                      .description(INFO_LDIFIMPORT_DESCRIPTION_OFFHEAP_SIZE.get())
+                      .lowerBound(0)
+                      .defaultValue(700)
+                      .valuePlaceholder(INFO_LDIFIMPORT_OFFHEAP_SIZE_PLACEHOLDER.get())
+                      .buildAndAddToParser(argParser);
       tmpDirectory =
               StringArgument.builder("tmpdirectory")
                       .description(INFO_LDIFIMPORT_DESCRIPTION_TEMP_DIRECTORY.get())
@@ -437,6 +445,7 @@
     addAttribute(attributes, ATTR_IMPORT_TEMPLATE_FILE, templateFile.getValue());
     addAttribute(attributes, ATTR_IMPORT_RANDOM_SEED, randomSeed.getValue());
     addAttribute(attributes, ATTR_IMPORT_THREAD_COUNT, threadCount.getValue());
+    addAttribute(attributes, ATTR_IMPORT_OFFHEAP_SIZE, offHeapSize.getValue());
 
     // Optional attributes
     addAttribute2(attributes, ATTR_IMPORT_BACKEND_ID, backendID);
@@ -932,30 +941,40 @@
     }
 
 
-      // Create the LDIF import configuration to use when reading the LDIF.
-      importConfig.setCompressed(isCompressed.isPresent());
-      importConfig.setClearBackend(clearBackend.isPresent());
-      importConfig.setEncrypted(isEncrypted.isPresent());
-      importConfig.setExcludeAttributes(excludeAttributes);
-      importConfig.setExcludeBranches(excludeBranches);
-      importConfig.setExcludeFilters(excludeFilters);
-      importConfig.setIncludeAttributes(includeAttributes);
-      importConfig.setIncludeBranches(includeBranches);
-      importConfig.setIncludeFilters(includeFilters);
-      importConfig.setValidateSchema(!skipSchemaValidation.isPresent());
-      importConfig.setSkipDNValidation(skipDNValidation.isPresent());
-      importConfig.setTmpDirectory(tmpDirectory.getValue());
+    // Create the LDIF import configuration to use when reading the LDIF.
+    importConfig.setCompressed(isCompressed.isPresent());
+    importConfig.setClearBackend(clearBackend.isPresent());
+    importConfig.setEncrypted(isEncrypted.isPresent());
+    importConfig.setExcludeAttributes(excludeAttributes);
+    importConfig.setExcludeBranches(excludeBranches);
+    importConfig.setExcludeFilters(excludeFilters);
+    importConfig.setIncludeAttributes(includeAttributes);
+    importConfig.setIncludeBranches(includeBranches);
+    importConfig.setIncludeFilters(includeFilters);
+    importConfig.setValidateSchema(!skipSchemaValidation.isPresent());
+    importConfig.setSkipDNValidation(skipDNValidation.isPresent());
+    importConfig.setTmpDirectory(tmpDirectory.getValue());
 
-      try
-      {
-          importConfig.setThreadCount(threadCount.getIntValue());
-      }
-      catch(Exception e)
-      {
-          logger.error(ERR_LDIFIMPORT_CANNOT_PARSE_THREAD_COUNT,
-                  threadCount.getValue(), e.getMessage());
-          return 1;
-      }
+    try
+    {
+        importConfig.setThreadCount(threadCount.getIntValue());
+    }
+    catch(Exception e)
+    {
+        logger.error(ERR_LDIFIMPORT_CANNOT_PARSE_THREAD_COUNT,
+                threadCount.getValue(), e.getMessage());
+        return 1;
+    }
+
+    try
+    {
+      importConfig.setOffHeapSize(offHeapSize.getIntValue());
+    }
+    catch (Exception e)
+    {
+      logger.error(ERR_LDIFIMPORT_CANNOT_PARSE_OFFHEAP_SIZE, offHeapSize.getValue(), e.getMessage());
+      return 1;
+    }
 
     importConfig.setBufferSize(LDIF_BUFFER_SIZE);
     importConfig.setExcludeAllUserAttributes(excludeAllUserAttributes);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java b/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
index 5997e7b..c40b25c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
@@ -110,6 +110,9 @@
   private boolean skipDNValidation;
   private int threadCount;
 
+  /** Indicates the memory size, in megabytes, to use for off-heap buffers. */
+  private int offHeapSize;
+
 
   /**
    * Creates a new LDIF import configuration that will read from the
@@ -1062,6 +1065,26 @@
 
 
   /**
+   * Set the memory size available for off-heap buffers.
+   *
+   * @param sizeInMb The memory size available expressed in megabytes.
+   */
+  public void setOffHeapSize(int sizeInMb)
+  {
+    this.offHeapSize = sizeInMb;
+  }
+
+  /**
+   * Get the memory size available for off-heap buffers.
+   *
+   * @return The memory size in megabytes.
+   */
+  public int getOffHeapSize()
+  {
+    return offHeapSize;
+  }
+
+  /**
    * Set the thread count.
    *
    * @param c The thread count value.
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/backend.properties b/opendj-server-legacy/src/messages/org/opends/messages/backend.properties
index 4e8fec8..b208b23 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/backend.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/backend.properties
@@ -1078,3 +1078,6 @@
 children for DN <%s> (got %d, expecting %d)
 ERR_VERIFY_ID2COUNT_WRONG_ID_597=File id2ChildrenCount references non-existing EntryID <%d>.
 NOTE_REBUILD_NOTHING_TO_REBUILD_598=Rebuilding index finished: no indexes to rebuild.
+NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO_520=Setting DB cache size to %d bytes. \
+ Using %d Mb off-heap memory through %d phase one buffers of %d Kb.
+ 
\ No newline at end of file
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/tool.properties b/opendj-server-legacy/src/messages/org/opends/messages/tool.properties
index 342afe5..70613b5 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/tool.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/tool.properties
@@ -2460,6 +2460,11 @@
 INFO_INDEX_NAME_PLACEHOLDER_1894={indexName}
 INFO_DESCRIPTION_BACKEND_DEBUG_RAW_DB_NAME_1895=The raw database name
 INFO_CHANGE_NUMBER_PLACEHOLDER_1896={change number}
+ERR_LDIFIMPORT_CANNOT_PARSE_OFFHEAP_SIZE_1897=The value %s for \
+offHeapSize cannot be parsed: %s
+INFO_LDIFIMPORT_DESCRIPTION_OFFHEAP_SIZE_1898=Size expressed in megabytes of the off-heap memory dedicated to the \
+phase one buffers.
+INFO_LDIFIMPORT_OFFHEAP_SIZE_PLACEHOLDER_1687={size in megabytes}
 
 # Upgrade tasks
 INFO_UPGRADE_TASK_6869_SUMMARY_10000=Fixing de-DE collation matching rule OID
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index dd694f8..053c832 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -92,13 +92,9 @@
 
     buffer.writeByteSequence(0, binary);
     buffer.writeInt(4, 1234);
-    buffer.writeCompactUnsignedLong(8, 42);
-    buffer.writeCompactUnsignedLong(9, 0xFFFF);
 
     assertThat(buffer.readByteString(0, 4)).isEqualTo(binary);
     assertThat(buffer.readInt(4)).isEqualTo(1234);
-    assertThat(buffer.readCompactUnsignedLong(8)).isEqualTo(42);
-    assertThat(buffer.readCompactUnsignedLong(9)).isEqualTo(0xFFFF);
   }
 
   @Test
@@ -314,10 +310,12 @@
     final int NB_REGION = 10;
     final ByteString KEY = ByteString.valueOfUtf8("key");
     final File tempDir = TestCaseUtils.createTemporaryDirectory("testExternalSortChunk");
-    try(final BufferPool bufferPool = new BufferPool(2, 4 + 1 + KEY.length() + 1 + 4)) {
-      // 4: record offset, 1: key length, 1: value length, 4: value
+    try (final BufferPool bufferPool = new BufferPool(2, 4 + 4 + KEY.length() + 4 + 4))
+    {
+      // 4: record offset, 4: key length, 4: value length, 4: value
       final ExternalSortChunk chunk =
-          new ExternalSortChunk(tempDir, "test", bufferPool, StringConcatCollector.INSTANCE, new ForkJoinPool());
+          new ExternalSortChunk(tempDir, "test", bufferPool, StringConcatCollector.INSTANCE,
+              StringConcatCollector.INSTANCE, new ForkJoinPool());
 
       List<ByteString> expected = new ArrayList<>(NB_REGION);
       for (int i = 0; i < NB_REGION; i++)

--
Gitblit v1.10.0